diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index f640b417b3..5a2f2e5ebb 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import itertools
import logging
from http import HTTPStatus
from typing import (
@@ -45,7 +46,7 @@ from synapse.api.errors import (
RequestSendFailed,
SynapseError,
)
-from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion, RoomVersions
from synapse.event_auth import (
auth_types_for_event,
check_auth_rules_for_event,
@@ -214,7 +215,7 @@ class FederationEventHandler:
if missing_prevs:
# We only backfill backwards to the min depth.
- min_depth = await self.get_min_depth_for_context(pdu.room_id)
+ min_depth = await self._store.get_min_depth(pdu.room_id)
logger.debug("min_depth: %d", min_depth)
if min_depth is not None and pdu.depth > min_depth:
@@ -390,9 +391,122 @@ class FederationEventHandler:
prev_member_event,
)
+ async def process_remote_join(
+ self,
+ origin: str,
+ room_id: str,
+ auth_events: List[EventBase],
+ state: List[EventBase],
+ event: EventBase,
+ room_version: RoomVersion,
+ ) -> int:
+ """Persists the events returned by a send_join
+
+ Checks the auth chain is valid (and passes auth checks) for the
+ state and event. Then persists the auth chain and state atomically.
+ Persists the event separately. Notifies about the persisted events
+ where appropriate.
+
+ Will attempt to fetch missing auth events.
+
+ Args:
+ origin: Where the events came from
+ room_id,
+ auth_events
+ state
+ event
+ room_version: The room version we expect this room to have, and
+ will raise if it doesn't match the version in the create event.
+ """
+ events_to_context = {}
+ for e in itertools.chain(auth_events, state):
+ e.internal_metadata.outlier = True
+ events_to_context[e.event_id] = EventContext.for_outlier()
+
+ event_map = {
+ e.event_id: e for e in itertools.chain(auth_events, state, [event])
+ }
+
+ create_event = None
+ for e in auth_events:
+ if (e.type, e.state_key) == (EventTypes.Create, ""):
+ create_event = e
+ break
+
+ if create_event is None:
+ # If the state doesn't have a create event then the room is
+ # invalid, and it would fail auth checks anyway.
+ raise SynapseError(400, "No create event in state")
+
+ room_version_id = create_event.content.get(
+ "room_version", RoomVersions.V1.identifier
+ )
+
+ if room_version.identifier != room_version_id:
+ raise SynapseError(400, "Room version mismatch")
+
+ missing_auth_events = set()
+ for e in itertools.chain(auth_events, state, [event]):
+ for e_id in e.auth_event_ids():
+ if e_id not in event_map:
+ missing_auth_events.add(e_id)
+
+ for e_id in missing_auth_events:
+ m_ev = await self._federation_client.get_pdu(
+ [origin],
+ e_id,
+ room_version=room_version,
+ outlier=True,
+ timeout=10000,
+ )
+ if m_ev and m_ev.event_id == e_id:
+ event_map[e_id] = m_ev
+ else:
+ logger.info("Failed to find auth event %r", e_id)
+
+ for e in itertools.chain(auth_events, state, [event]):
+ auth_for_e = [
+ event_map[e_id] for e_id in e.auth_event_ids() if e_id in event_map
+ ]
+ if create_event:
+ auth_for_e.append(create_event)
+
+ try:
+ validate_event_for_room_version(room_version, e)
+ check_auth_rules_for_event(room_version, e, auth_for_e)
+ except SynapseError as err:
+ # we may get SynapseErrors here as well as AuthErrors. For
+ # instance, there are a couple of (ancient) events in some
+ # rooms whose senders do not have the correct sigil; these
+ # cause SynapseErrors in auth.check. We don't want to give up
+ # the attempt to federate altogether in such cases.
+
+ logger.warning("Rejecting %s because %s", e.event_id, err.msg)
+
+ if e == event:
+ raise
+ events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
+
+ if auth_events or state:
+ await self.persist_events_and_notify(
+ room_id,
+ [
+ (e, events_to_context[e.event_id])
+ for e in itertools.chain(auth_events, state)
+ ],
+ )
+
+ new_event_context = await self._state_handler.compute_event_context(
+ event, old_state=state
+ )
+
+ return await self.persist_events_and_notify(
+ room_id, [(event, new_event_context)]
+ )
+
@log_function
async def backfill(
- self, dest: str, room_id: str, limit: int, extremities: List[str]
+ self, dest: str, room_id: str, limit: int, extremities: Iterable[str]
) -> None:
"""Trigger a backfill request to `dest` for the given `room_id`
@@ -1116,14 +1230,12 @@ class FederationEventHandler:
await concurrently_execute(get_event, event_ids, 5)
logger.info("Fetched %i events of %i requested", len(events), len(event_ids))
- await self._auth_and_persist_fetched_events(destination, room_id, events)
+ await self._auth_and_persist_outliers(room_id, events)
- async def _auth_and_persist_fetched_events(
- self, origin: str, room_id: str, events: Iterable[EventBase]
+ async def _auth_and_persist_outliers(
+ self, room_id: str, events: Iterable[EventBase]
) -> None:
- """Persist the events fetched by _get_events_and_persist or _get_remote_auth_chain_for_event
-
- The events to be persisted must be outliers.
+ """Persist a batch of outlier events fetched from remote servers.
We first sort the events to make sure that we process each event's auth_events
before the event itself, and then auth and persist them.
@@ -1131,7 +1243,6 @@ class FederationEventHandler:
Notifies about the events where appropriate.
Params:
- origin: where the events came from
room_id: the room that the events are meant to be in (though this has
not yet been checked)
events: the events that have been fetched
@@ -1167,15 +1278,15 @@ class FederationEventHandler:
shortstr(e.event_id for e in roots),
)
- await self._auth_and_persist_fetched_events_inner(origin, room_id, roots)
+ await self._auth_and_persist_outliers_inner(room_id, roots)
for ev in roots:
del event_map[ev.event_id]
- async def _auth_and_persist_fetched_events_inner(
- self, origin: str, room_id: str, fetched_events: Collection[EventBase]
+ async def _auth_and_persist_outliers_inner(
+ self, room_id: str, fetched_events: Collection[EventBase]
) -> None:
- """Helper for _auth_and_persist_fetched_events
+ """Helper for _auth_and_persist_outliers
Persists a batch of events where we have (theoretically) already persisted all
of their auth events.
@@ -1203,7 +1314,7 @@ class FederationEventHandler:
def prep(event: EventBase) -> Optional[Tuple[EventBase, EventContext]]:
with nested_logging_context(suffix=event.event_id):
- auth = {}
+ auth = []
for auth_event_id in event.auth_event_ids():
ae = persisted_events.get(auth_event_id)
if not ae:
@@ -1216,7 +1327,7 @@ class FederationEventHandler:
# exist, which means it is premature to reject `event`. Instead we
# just ignore it for now.
return None
- auth[(ae.type, ae.state_key)] = ae
+ auth.append(ae)
context = EventContext.for_outlier()
try:
@@ -1256,6 +1367,10 @@ class FederationEventHandler:
Returns:
The updated context object.
+
+ Raises:
+ AuthError if we were unable to find copies of the event's auth events.
+ (Most other failures just cause us to set `context.rejected`.)
"""
# This method should only be used for non-outliers
assert not event.internal_metadata.outlier
@@ -1272,7 +1387,26 @@ class FederationEventHandler:
context.rejected = RejectedReason.AUTH_ERROR
return context
- # calculate what the auth events *should* be, to use as a basis for auth.
+ # next, check that we have all of the event's auth events.
+ #
+ # Note that this can raise AuthError, which we want to propagate to the
+ # caller rather than swallow with `context.rejected` (since we cannot be
+ # certain that there is a permanent problem with the event).
+ claimed_auth_events = await self._load_or_fetch_auth_events_for_event(
+ origin, event
+ )
+
+ # ... and check that the event passes auth at those auth events.
+ try:
+ check_auth_rules_for_event(room_version_obj, event, claimed_auth_events)
+ except AuthError as e:
+ logger.warning(
+ "While checking auth of %r against auth_events: %s", event, e
+ )
+ context.rejected = RejectedReason.AUTH_ERROR
+ return context
+
+ # now check auth against what we think the auth events *should* be.
prev_state_ids = await context.get_prev_state_ids()
auth_events_ids = self._event_auth_handler.compute_auth_events(
event, prev_state_ids, for_verification=True
@@ -1305,7 +1439,9 @@ class FederationEventHandler:
auth_events_for_auth = calculated_auth_event_map
try:
- check_auth_rules_for_event(room_version_obj, event, auth_events_for_auth)
+ check_auth_rules_for_event(
+ room_version_obj, event, auth_events_for_auth.values()
+ )
except AuthError as e:
logger.warning("Failed auth resolution for %r because %s", event, e)
context.rejected = RejectedReason.AUTH_ERROR
@@ -1403,11 +1539,9 @@ class FederationEventHandler:
current_state_ids_list = [
e for k, e in current_state_ids.items() if k in auth_types
]
-
- auth_events_map = await self._store.get_events(current_state_ids_list)
- current_auth_events = {
- (e.type, e.state_key): e for e in auth_events_map.values()
- }
+ current_auth_events = await self._store.get_events_as_list(
+ current_state_ids_list
+ )
try:
check_auth_rules_for_event(room_version_obj, event, current_auth_events)
@@ -1472,6 +1606,9 @@ class FederationEventHandler:
# if we have missing events, we need to fetch those events from somewhere.
#
# we start by checking if they are in the store, and then try calling /event_auth/.
+ #
+ # TODO: this code is now redundant, since it should be impossible for us to
+ # get here without already having the auth events.
if missing_auth:
have_events = await self._store.have_seen_events(
event.room_id, missing_auth
@@ -1575,7 +1712,7 @@ class FederationEventHandler:
logger.info(
"After state res: updating auth_events with new state %s",
{
- (d.type, d.state_key): d.event_id
+ d
for d in new_state.values()
if auth_events.get((d.type, d.state_key)) != d
},
@@ -1589,6 +1726,75 @@ class FederationEventHandler:
return context, auth_events
+ async def _load_or_fetch_auth_events_for_event(
+ self, destination: str, event: EventBase
+ ) -> Collection[EventBase]:
+ """Fetch this event's auth_events, from database or remote
+
+ Loads any of the auth_events that we already have from the database/cache. If
+ there are any that are missing, calls /event_auth to get the complete auth
+ chain for the event (and then attempts to load the auth_events again).
+
+ If any of the auth_events cannot be found, raises an AuthError. This can happen
+ for a number of reasons; eg: the events don't exist, or we were unable to talk
+ to `destination`, or we couldn't validate the signature on the event (which
+ in turn has multiple potential causes).
+
+ Args:
+ destination: where to send the /event_auth request. Typically the server
+ that sent us `event` in the first place.
+ event: the event whose auth_events we want
+
+ Returns:
+ all of the events in `event.auth_events`, after deduplication
+
+ Raises:
+ AuthError if we were unable to fetch the auth_events for any reason.
+ """
+ event_auth_event_ids = set(event.auth_event_ids())
+ event_auth_events = await self._store.get_events(
+ event_auth_event_ids, allow_rejected=True
+ )
+ missing_auth_event_ids = event_auth_event_ids.difference(
+ event_auth_events.keys()
+ )
+ if not missing_auth_event_ids:
+ return event_auth_events.values()
+
+ logger.info(
+ "Event %s refers to unknown auth events %s: fetching auth chain",
+ event,
+ missing_auth_event_ids,
+ )
+ try:
+ await self._get_remote_auth_chain_for_event(
+ destination, event.room_id, event.event_id
+ )
+ except Exception as e:
+ logger.warning("Failed to get auth chain for %s: %s", event, e)
+ # in this case, it's very likely we still won't have all the auth
+ # events - but we pick that up below.
+
+ # try to fetch the auth events we missed list time.
+ extra_auth_events = await self._store.get_events(
+ missing_auth_event_ids, allow_rejected=True
+ )
+ missing_auth_event_ids.difference_update(extra_auth_events.keys())
+ event_auth_events.update(extra_auth_events)
+ if not missing_auth_event_ids:
+ return event_auth_events.values()
+
+ # we still don't have all the auth events.
+ logger.warning(
+ "Missing auth events for %s: %s",
+ event,
+ shortstr(missing_auth_event_ids),
+ )
+ # the fact we can't find the auth event doesn't mean it doesn't
+ # exist, which means it is premature to store `event` as rejected.
+ # instead we raise an AuthError, which will make the caller ignore it.
+ raise AuthError(code=HTTPStatus.FORBIDDEN, msg="Auth events could not be found")
+
async def _get_remote_auth_chain_for_event(
self, destination: str, room_id: str, event_id: str
) -> None:
@@ -1624,9 +1830,7 @@ class FederationEventHandler:
for s in seen_remotes:
remote_event_map.pop(s, None)
- await self._auth_and_persist_fetched_events(
- destination, room_id, remote_event_map.values()
- )
+ await self._auth_and_persist_outliers(room_id, remote_event_map.values())
async def _update_context_for_auth_events(
self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase]
@@ -1696,16 +1900,27 @@ class FederationEventHandler:
# persist_events_and_notify directly.)
assert not event.internal_metadata.outlier
- try:
- if (
- not backfilled
- and not context.rejected
- and (await self._store.get_min_depth(event.room_id)) <= event.depth
- ):
+ if not backfilled and not context.rejected:
+ min_depth = await self._store.get_min_depth(event.room_id)
+ if min_depth is None or min_depth > event.depth:
+ # XXX richvdh 2021/10/07: I don't really understand what this
+ # condition is doing. I think it's trying not to send pushes
+ # for events that predate our join - but that's not really what
+ # min_depth means, and anyway ancient events are a more general
+ # problem.
+ #
+ # for now I'm just going to log about it.
+ logger.info(
+ "Skipping push actions for old event with depth %s < %s",
+ event.depth,
+ min_depth,
+ )
+ else:
await self._action_generator.handle_push_actions_for_event(
event, context
)
+ try:
await self.persist_events_and_notify(
event.room_id, [(event, context)], backfilled=backfilled
)
@@ -1837,6 +2052,3 @@ class FederationEventHandler:
len(ev.auth_event_ids()),
)
raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
-
- async def get_min_depth_for_context(self, context: str) -> int:
- return await self._store.get_min_depth(context)
|