diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 3b95beeb08..2c4644b4a3 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -27,11 +27,8 @@ from typing import (
Tuple,
)
-import attr
from prometheus_client import Counter
-from twisted.internet import defer
-
from synapse import event_auth
from synapse.api.constants import (
EventContentFields,
@@ -54,11 +51,7 @@ from synapse.event_auth import auth_types_for_event
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.federation.federation_client import InvalidResponseError
-from synapse.logging.context import (
- make_deferred_yieldable,
- nested_logging_context,
- run_in_background,
-)
+from synapse.logging.context import nested_logging_context, run_in_background
from synapse.logging.utils import log_function
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
@@ -92,30 +85,6 @@ soft_failed_event_counter = Counter(
)
-@attr.s(slots=True, frozen=True, auto_attribs=True)
-class _NewEventInfo:
- """Holds information about a received event, ready for passing to _auth_and_persist_events
-
- Attributes:
- event: the received event
-
- claimed_auth_event_map: a map of (type, state_key) => event for the event's
- claimed auth_events.
-
- This can include events which have not yet been persisted, in the case that
- we are backfilling a batch of events.
-
- Note: May be incomplete: if we were unable to find all of the claimed auth
- events. Also, treat the contents with caution: the events might also have
- been rejected, might not yet have been authorized themselves, or they might
- be in the wrong room.
-
- """
-
- event: EventBase
- claimed_auth_event_map: StateMap[EventBase]
-
-
class FederationEventHandler:
"""Handles events that originated from federation.
@@ -1107,7 +1076,7 @@ class FederationEventHandler:
room_version = await self._store.get_room_version(room_id)
- event_map: Dict[str, EventBase] = {}
+ events: List[EventBase] = []
async def get_event(event_id: str) -> None:
with nested_logging_context(event_id):
@@ -1125,8 +1094,7 @@ class FederationEventHandler:
event_id,
)
return
-
- event_map[event.event_id] = event
+ events.append(event)
except Exception as e:
logger.warning(
@@ -1137,11 +1105,29 @@ class FederationEventHandler:
)
await concurrently_execute(get_event, event_ids, 5)
- logger.info("Fetched %i events of %i requested", len(event_map), len(event_ids))
+ logger.info("Fetched %i events of %i requested", len(events), len(event_ids))
+ await self._auth_and_persist_fetched_events(destination, room_id, events)
+
+ async def _auth_and_persist_fetched_events(
+ self, origin: str, room_id: str, events: Iterable[EventBase]
+ ) -> None:
+ """Persist the events fetched by _get_events_and_persist or _get_remote_auth_chain_for_event
+
+ The events to be persisted must be outliers.
+
+ We first sort the events to make sure that we process each event's auth_events
+ before the event itself, and then auth and persist them.
+
+ Notifies about the events where appropriate.
+
+ Params:
+ origin: where the events came from
+ room_id: the room that the events are meant to be in (though this has
+ not yet been checked)
+ events: the events that have been fetched
+ """
+ event_map = {event.event_id: event for event in events}
- # we now need to auth the events in an order which ensures that each event's
- # auth_events are authed before the event itself.
- #
# XXX: it might be possible to kick this process off in parallel with fetching
# the events.
while event_map:
@@ -1168,22 +1154,18 @@ class FederationEventHandler:
"Persisting %i of %i remaining events", len(roots), len(event_map)
)
- await self._auth_and_persist_fetched_events(destination, room_id, roots)
+ await self._auth_and_persist_fetched_events_inner(origin, room_id, roots)
for ev in roots:
del event_map[ev.event_id]
- async def _auth_and_persist_fetched_events(
+ async def _auth_and_persist_fetched_events_inner(
self, origin: str, room_id: str, fetched_events: Collection[EventBase]
) -> None:
- """Persist the events fetched by _get_events_and_persist.
-
- The events should not depend on one another, e.g. this should be used to persist
- a bunch of outliers, but not a chunk of individual events that depend
- on each other for state calculations.
+ """Helper for _auth_and_persist_fetched_events
- We also assume that all of the auth events for all of the events have already
- been persisted.
+ Persists a batch of events where we have (theoretically) already persisted all
+ of their auth events.
Notifies about the events where appropriate.
@@ -1191,7 +1173,7 @@ class FederationEventHandler:
origin: where the events came from
room_id: the room that the events are meant to be in (though this has
not yet been checked)
- event_id: map from event_id -> event for the fetched events
+ fetched_events: the events to persist
"""
# get all the auth events for all the events in this batch. By now, they should
# have been persisted.
@@ -1203,47 +1185,37 @@ class FederationEventHandler:
allow_rejected=True,
)
- event_infos = []
- for event in fetched_events:
- auth = {}
- for auth_event_id in event.auth_event_ids():
- ae = persisted_events.get(auth_event_id)
- if ae:
- auth[(ae.type, ae.state_key)] = ae
- else:
- logger.info("Missing auth event %s", auth_event_id)
-
- event_infos.append(_NewEventInfo(event, auth))
-
- if not event_infos:
- return
+ room_version = await self._store.get_room_version_id(room_id)
+ room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
- async def prep(ev_info: _NewEventInfo) -> EventContext:
- event = ev_info.event
+ def prep(event: EventBase) -> Optional[Tuple[EventBase, EventContext]]:
with nested_logging_context(suffix=event.event_id):
- res = await self._state_handler.compute_event_context(event)
- res = await self._check_event_auth(
- origin,
- event,
- res,
- claimed_auth_event_map=ev_info.claimed_auth_event_map,
- )
- return res
+ auth = {}
+ for auth_event_id in event.auth_event_ids():
+ ae = persisted_events.get(auth_event_id)
+ if not ae:
+ logger.warning(
+ "Event %s relies on auth_event %s, which could not be found.",
+ event,
+ auth_event_id,
+ )
+ # the fact we can't find the auth event doesn't mean it doesn't
+ # exist, which means it is premature to reject `event`. Instead we
+ # just ignore it for now.
+ return None
+ auth[(ae.type, ae.state_key)] = ae
- contexts = await make_deferred_yieldable(
- defer.gatherResults(
- [run_in_background(prep, ev_info) for ev_info in event_infos],
- consumeErrors=True,
- )
- )
+ context = EventContext.for_outlier()
+ try:
+ event_auth.check(room_version_obj, event, auth_events=auth)
+ except AuthError as e:
+ logger.warning("Rejecting %r because %s", event, e)
+ context.rejected = RejectedReason.AUTH_ERROR
- await self.persist_events_and_notify(
- room_id,
- [
- (ev_info.event, context)
- for ev_info, context in zip(event_infos, contexts)
- ],
- )
+ return event, context
+
+ events_to_persist = (x for x in (prep(event) for event in fetched_events) if x)
+ await self.persist_events_and_notify(room_id, tuple(events_to_persist))
async def _check_event_auth(
self,
@@ -1251,7 +1223,6 @@ class FederationEventHandler:
event: EventBase,
context: EventContext,
state: Optional[Iterable[EventBase]] = None,
- claimed_auth_event_map: Optional[StateMap[EventBase]] = None,
backfilled: bool = False,
) -> EventContext:
"""
@@ -1267,43 +1238,36 @@ class FederationEventHandler:
The state events used to check the event for soft-fail. If this is
not provided the current state events will be used.
- claimed_auth_event_map:
- A map of (type, state_key) => event for the event's claimed auth_events.
- Possibly incomplete, and possibly including events that are not yet
- persisted, or authed, or in the right room.
-
- Only populated when populating outliers.
-
backfilled: True if the event was backfilled.
Returns:
The updated context object.
"""
- # claimed_auth_event_map should be given iff the event is an outlier
- assert bool(claimed_auth_event_map) == event.internal_metadata.outlier
+ # This method should only be used for non-outliers
+ assert not event.internal_metadata.outlier
room_version = await self._store.get_room_version_id(event.room_id)
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
- if claimed_auth_event_map:
- # if we have a copy of the auth events from the event, use that as the
- # basis for auth.
- auth_events = claimed_auth_event_map
- else:
- # otherwise, we calculate what the auth events *should* be, and use that
- prev_state_ids = await context.get_prev_state_ids()
- auth_events_ids = self._event_auth_handler.compute_auth_events(
- event, prev_state_ids, for_verification=True
- )
- auth_events_x = await self._store.get_events(auth_events_ids)
- auth_events = {(e.type, e.state_key): e for e in auth_events_x.values()}
+ # calculate what the auth events *should* be, to use as a basis for auth.
+ prev_state_ids = await context.get_prev_state_ids()
+ auth_events_ids = self._event_auth_handler.compute_auth_events(
+ event, prev_state_ids, for_verification=True
+ )
+ auth_events_x = await self._store.get_events(auth_events_ids)
+ calculated_auth_event_map = {
+ (e.type, e.state_key): e for e in auth_events_x.values()
+ }
try:
(
context,
auth_events_for_auth,
) = await self._update_auth_events_and_context_for_auth(
- origin, event, context, auth_events
+ origin,
+ event,
+ context,
+ calculated_auth_event_map=calculated_auth_event_map,
)
except Exception:
# We don't really mind if the above fails, so lets not fail
@@ -1315,7 +1279,7 @@ class FederationEventHandler:
"Ignoring failure and continuing processing of event.",
event.event_id,
)
- auth_events_for_auth = auth_events
+ auth_events_for_auth = calculated_auth_event_map
try:
event_auth.check(room_version_obj, event, auth_events=auth_events_for_auth)
@@ -1451,7 +1415,7 @@ class FederationEventHandler:
origin: str,
event: EventBase,
context: EventContext,
- input_auth_events: StateMap[EventBase],
+ calculated_auth_event_map: StateMap[EventBase],
) -> Tuple[EventContext, StateMap[EventBase]]:
"""Helper for _check_event_auth. See there for docs.
@@ -1469,19 +1433,17 @@ class FederationEventHandler:
event:
context:
- input_auth_events:
- Map from (event_type, state_key) to event
-
- Normally, our calculated auth_events based on the state of the room
- at the event's position in the DAG, though occasionally (eg if the
- event is an outlier), may be the auth events claimed by the remote
- server.
+ calculated_auth_event_map:
+ Our calculated auth_events based on the state of the room
+ at the event's position in the DAG.
Returns:
updated context, updated auth event map
"""
- # take a copy of input_auth_events before we modify it.
- auth_events: MutableStateMap[EventBase] = dict(input_auth_events)
+ assert not event.internal_metadata.outlier
+
+ # take a copy of calculated_auth_event_map before we modify it.
+ auth_events: MutableStateMap[EventBase] = dict(calculated_auth_event_map)
event_auth_events = set(event.auth_event_ids())
@@ -1505,73 +1467,22 @@ class FederationEventHandler:
# If we don't have all the auth events, we need to get them.
logger.info("auth_events contains unknown events: %s", missing_auth)
try:
- try:
- remote_auth_chain = await self._federation_client.get_event_auth(
- origin, event.room_id, event.event_id
- )
- except RequestSendFailed as e1:
- # The other side isn't around or doesn't implement the
- # endpoint, so lets just bail out.
- logger.info("Failed to get event auth from remote: %s", e1)
- return context, auth_events
-
- seen_remotes = await self._store.have_seen_events(
- event.room_id, [e.event_id for e in remote_auth_chain]
+ await self._get_remote_auth_chain_for_event(
+ origin, event.room_id, event.event_id
)
-
- for auth_event in remote_auth_chain:
- if auth_event.event_id in seen_remotes:
- continue
-
- if auth_event.event_id == event.event_id:
- continue
-
- try:
- auth_ids = auth_event.auth_event_ids()
- auth = {
- (e.type, e.state_key): e
- for e in remote_auth_chain
- if e.event_id in auth_ids or e.type == EventTypes.Create
- }
- auth_event.internal_metadata.outlier = True
-
- logger.debug(
- "_check_event_auth %s missing_auth: %s",
- event.event_id,
- auth_event.event_id,
- )
- missing_auth_event_context = (
- await self._state_handler.compute_event_context(auth_event)
- )
-
- missing_auth_event_context = await self._check_event_auth(
- origin,
- auth_event,
- missing_auth_event_context,
- claimed_auth_event_map=auth,
- )
- await self.persist_events_and_notify(
- event.room_id, [(auth_event, missing_auth_event_context)]
- )
-
- if auth_event.event_id in event_auth_events:
- auth_events[
- (auth_event.type, auth_event.state_key)
- ] = auth_event
- except AuthError:
- pass
-
except Exception:
logger.exception("Failed to get auth chain")
-
- if event.internal_metadata.is_outlier():
- # XXX: given that, for an outlier, we'll be working with the
- # event's *claimed* auth events rather than those we calculated:
- # (a) is there any point in this test, since different_auth below will
- # obviously be empty
- # (b) alternatively, why don't we do it earlier?
- logger.info("Skipping auth_event fetch for outlier")
- return context, auth_events
+ else:
+ # load any auth events we might have persisted from the database. This
+ # has the side-effect of correctly setting the rejected_reason on them.
+ auth_events.update(
+ {
+ (ae.type, ae.state_key): ae
+ for ae in await self._store.get_events_as_list(
+ missing_auth, allow_rejected=True
+ )
+ }
+ )
different_auth = event_auth_events.difference(
e.event_id for e in auth_events.values()
@@ -1636,6 +1547,45 @@ class FederationEventHandler:
return context, auth_events
+ async def _get_remote_auth_chain_for_event(
+ self, destination: str, room_id: str, event_id: str
+ ) -> None:
+ """If we are missing some of an event's auth events, attempt to request them
+
+ Args:
+ destination: where to fetch the auth tree from
+ room_id: the room in which we are lacking auth events
+ event_id: the event for which we are lacking auth events
+ """
+ try:
+ remote_event_map = {
+ e.event_id: e
+ for e in await self._federation_client.get_event_auth(
+ destination, room_id, event_id
+ )
+ }
+ except RequestSendFailed as e1:
+ # The other side isn't around or doesn't implement the
+ # endpoint, so lets just bail out.
+ logger.info("Failed to get event auth from remote: %s", e1)
+ return
+
+ logger.info("/event_auth returned %i events", len(remote_event_map))
+
+ # `event` may be returned, but we should not yet process it.
+ remote_event_map.pop(event_id, None)
+
+ # nor should we reprocess any events we have already seen.
+ seen_remotes = await self._store.have_seen_events(
+ room_id, remote_event_map.keys()
+ )
+ for s in seen_remotes:
+ remote_event_map.pop(s, None)
+
+ await self._auth_and_persist_fetched_events(
+ destination, room_id, remote_event_map.values()
+ )
+
async def _update_context_for_auth_events(
self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase]
) -> EventContext:
|