diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 87a0608359..ace7adcffb 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import collections
import itertools
import logging
from http import HTTPStatus
@@ -28,7 +29,7 @@ from typing import (
Tuple,
)
-from prometheus_client import Counter
+from prometheus_client import Counter, Histogram
from synapse import event_auth
from synapse.api.constants import (
@@ -50,19 +51,28 @@ from synapse.api.errors import (
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion, RoomVersions
from synapse.event_auth import (
auth_types_for_event,
- check_auth_rules_for_event,
+ check_state_dependent_auth_rules,
+ check_state_independent_auth_rules,
validate_event_for_room_version,
)
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.federation.federation_client import InvalidResponseError
-from synapse.logging.context import nested_logging_context, run_in_background
+from synapse.logging.context import nested_logging_context
+from synapse.logging.opentracing import (
+ SynapseTags,
+ set_tag,
+ start_active_span,
+ tag_args,
+ trace,
+)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.replication.http.federation import (
ReplicationFederationSendEventsRestServlet,
)
from synapse.state import StateResolutionStore
+from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import (
@@ -88,6 +98,36 @@ soft_failed_event_counter = Counter(
"Events received over federation that we marked as soft_failed",
)
+# Added to debug performance and track progress on optimizations
+backfill_processing_after_timer = Histogram(
+ "synapse_federation_backfill_processing_after_time_seconds",
+ "sec",
+ [],
+ buckets=(
+ 0.1,
+ 0.25,
+ 0.5,
+ 1.0,
+ 2.5,
+ 5.0,
+ 7.5,
+ 10.0,
+ 15.0,
+ 20.0,
+ 25.0,
+ 30.0,
+ 40.0,
+ 50.0,
+ 60.0,
+ 80.0,
+ 100.0,
+ 120.0,
+ 150.0,
+ 180.0,
+ "+Inf",
+ ),
+)
+
class FederationEventHandler:
"""Handles events that originated from federation.
@@ -274,7 +314,18 @@ class FederationEventHandler:
affected=pdu.event_id,
)
- await self._process_received_pdu(origin, pdu, state_ids=None)
+ try:
+ context = await self._state_handler.compute_event_context(pdu)
+ await self._process_received_pdu(origin, pdu, context)
+ except PartialStateConflictError:
+ # The room was un-partial stated while we were processing the PDU.
+ # Try once more, with full state this time.
+ logger.info(
+ "Room %s was un-partial stated while processing the PDU, trying again.",
+ room_id,
+ )
+ context = await self._state_handler.compute_event_context(pdu)
+ await self._process_received_pdu(origin, pdu, context)
async def on_send_membership_event(
self, origin: str, event: EventBase
@@ -304,7 +355,11 @@ class FederationEventHandler:
The event and context of the event after inserting it into the room graph.
Raises:
+ RuntimeError if any prev_events are missing
SynapseError if the event is not accepted into the room
+ PartialStateConflictError if the room was un-partial stated in between
+ computing the state at the event and persisting it. The caller should
+ retry exactly once in this case.
"""
logger.debug(
"on_send_membership_event: Got event: %s, signatures: %s",
@@ -333,7 +388,7 @@ class FederationEventHandler:
event.internal_metadata.send_on_behalf_of = origin
context = await self._state_handler.compute_event_context(event)
- context = await self._check_event_auth(origin, event, context)
+ await self._check_event_auth(origin, event, context)
if context.rejected:
raise SynapseError(
403, f"{event.membership} event was rejected", Codes.FORBIDDEN
@@ -361,7 +416,7 @@ class FederationEventHandler:
# need to.
await self._event_creation_handler.cache_joined_hosts_for_event(event, context)
- await self._check_for_soft_fail(event, None, origin=origin)
+ await self._check_for_soft_fail(event, context=context, origin=origin)
await self._run_push_actions_and_persist_event(event, context)
return event, context
@@ -391,6 +446,7 @@ class FederationEventHandler:
prev_member_event,
)
+ @trace
async def process_remote_join(
self,
origin: str,
@@ -422,6 +478,8 @@ class FederationEventHandler:
Raises:
SynapseError if the response is in some way invalid.
+ PartialStateConflictError if the homeserver is already in the room and it
+ has been un-partial stated.
"""
create_event = None
for e in state:
@@ -469,7 +527,7 @@ class FederationEventHandler:
partial_state=partial_state,
)
- context = await self._check_event_auth(origin, event, context)
+ await self._check_event_auth(origin, event, context)
if context.rejected:
raise SynapseError(400, "Join event was rejected")
@@ -517,31 +575,36 @@ class FederationEventHandler:
#
# This is the same operation as we do when we receive a regular event
# over federation.
- state_ids = await self._resolve_state_at_missing_prevs(destination, event)
-
- # build a new state group for it if need be
- context = await self._state_handler.compute_event_context(
- event,
- state_ids_before_event=state_ids,
+ context = await self._compute_event_context_with_maybe_missing_prevs(
+ destination, event
)
if context.partial_state:
# this can happen if some or all of the event's prev_events still have
- # partial state - ie, an event has an earlier stream_ordering than one
- # or more of its prev_events, so we de-partial-state it before its
- # prev_events.
+ # partial state. We were careful to only pick events from the db without
+ # partial-state prev events, so that implies that a prev event has
+ # been persisted (with partial state) since we did the query.
#
- # TODO(faster_joins): we probably need to be more intelligent, and
- # exclude partial-state prev_events from consideration
+ # So, let's just ignore `event` for now; when we re-run the db query
+ # we should instead get its partial-state prev event, which we will
+ # de-partial-state, and then come back to event.
logger.warning(
- "%s still has partial state: can't de-partial-state it yet",
+ "%s still has prev_events with partial state: can't de-partial-state it yet",
event.event_id,
)
return
+
+ # since the state at this event has changed, we should now re-evaluate
+ # whether it should have been rejected. We must already have all of the
+ # auth events (from last time we went round this path), so there is no
+ # need to pass the origin.
+ await self._check_event_auth(None, event, context)
+
await self._store.update_state_for_partial_state_event(event, context)
self._state_storage_controller.notify_event_un_partial_stated(
event.event_id
)
+ @trace
async def backfill(
self, dest: str, room_id: str, limit: int, extremities: Collection[str]
) -> None:
@@ -571,21 +634,23 @@ class FederationEventHandler:
if not events:
return
- # if there are any events in the wrong room, the remote server is buggy and
- # should not be trusted.
- for ev in events:
- if ev.room_id != room_id:
- raise InvalidResponseError(
- f"Remote server {dest} returned event {ev.event_id} which is in "
- f"room {ev.room_id}, when we were backfilling in {room_id}"
- )
+ with backfill_processing_after_timer.time():
+ # if there are any events in the wrong room, the remote server is buggy and
+ # should not be trusted.
+ for ev in events:
+ if ev.room_id != room_id:
+ raise InvalidResponseError(
+ f"Remote server {dest} returned event {ev.event_id} which is in "
+ f"room {ev.room_id}, when we were backfilling in {room_id}"
+ )
- await self._process_pulled_events(
- dest,
- events,
- backfilled=True,
- )
+ await self._process_pulled_events(
+ dest,
+ events,
+ backfilled=True,
+ )
+ @trace
async def _get_missing_events_for_pdu(
self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
) -> None:
@@ -686,8 +751,9 @@ class FederationEventHandler:
logger.info("Got %d prev_events", len(missing_events))
await self._process_pulled_events(origin, missing_events, backfilled=False)
+ @trace
async def _process_pulled_events(
- self, origin: str, events: Iterable[EventBase], backfilled: bool
+ self, origin: str, events: Collection[EventBase], backfilled: bool
) -> None:
"""Process a batch of events we have pulled from a remote server
@@ -702,6 +768,15 @@ class FederationEventHandler:
backfilled: True if this is part of a historical batch of events (inhibits
notification to clients, and validation of device keys.)
"""
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "event_ids",
+ str([event.event_id for event in events]),
+ )
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
+ str(len(events)),
+ )
+ set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
logger.debug(
"processing pulled backfilled=%s events=%s",
backfilled,
@@ -724,6 +799,8 @@ class FederationEventHandler:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)
+ @trace
+ @tag_args
async def _process_pulled_event(
self, origin: str, event: EventBase, backfilled: bool
) -> None:
@@ -748,10 +825,24 @@ class FederationEventHandler:
"""
logger.info("Processing pulled event %s", event)
- # these should not be outliers.
- assert (
- not event.internal_metadata.is_outlier()
- ), "pulled event unexpectedly flagged as outlier"
+ # This function should not be used to persist outliers (use something
+ # else) because this does a bunch of operations that aren't necessary
+ # (extra work; in particular, it makes sure we have all the prev_events
+ # and resolves the state across those prev events). If you happen to run
+ # into a situation where the event you're trying to process/backfill is
+ # marked as an `outlier`, then you should update that spot to return an
+ # `EventBase` copy that doesn't have `outlier` flag set.
+ #
+ # `EventBase` is used to represent both an event we have not yet
+ # persisted, and one that we have persisted and now keep in the cache.
+ # In an ideal world this method would only be called with the first type
+ # of event, but it turns out that's not actually the case and for
+ # example, you could get an event from cache that is marked as an
+ # `outlier` (fix up that spot though).
+ assert not event.internal_metadata.is_outlier(), (
+ "Outlier event passed to _process_pulled_event. "
+ "To persist an event as a non-outlier, make sure to pass in a copy without `event.internal_metadata.outlier = true`."
+ )
event_id = event.event_id
@@ -761,7 +852,7 @@ class FederationEventHandler:
if existing:
if not existing.internal_metadata.is_outlier():
logger.info(
- "Ignoring received event %s which we have already seen",
+ "_process_pulled_event: Ignoring received event %s which we have already seen",
event_id,
)
return
@@ -774,28 +865,56 @@ class FederationEventHandler:
return
try:
- state_ids = await self._resolve_state_at_missing_prevs(origin, event)
- # TODO(faster_joins): make sure that _resolve_state_at_missing_prevs does
- # not return partial state
+ try:
+ context = await self._compute_event_context_with_maybe_missing_prevs(
+ origin, event
+ )
+ await self._process_received_pdu(
+ origin,
+ event,
+ context,
+ backfilled=backfilled,
+ )
+ except PartialStateConflictError:
+ # The room was un-partial stated while we were processing the event.
+ # Try once more, with full state this time.
+ context = await self._compute_event_context_with_maybe_missing_prevs(
+ origin, event
+ )
- await self._process_received_pdu(
- origin, event, state_ids=state_ids, backfilled=backfilled
- )
+ # We ought to have full state now, barring some unlikely race where we left and
+ # rejoned the room in the background.
+ if context.partial_state:
+ raise AssertionError(
+ f"Event {event.event_id} still has a partial resolved state "
+ f"after room {event.room_id} was un-partial stated"
+ )
+
+ await self._process_received_pdu(
+ origin,
+ event,
+ context,
+ backfilled=backfilled,
+ )
except FederationError as e:
if e.code == 403:
logger.warning("Pulled event %s failed history check.", event_id)
else:
raise
- async def _resolve_state_at_missing_prevs(
+ @trace
+ async def _compute_event_context_with_maybe_missing_prevs(
self, dest: str, event: EventBase
- ) -> Optional[StateMap[str]]:
- """Calculate the state at an event with missing prev_events.
+ ) -> EventContext:
+ """Build an EventContext structure for a non-outlier event whose prev_events may
+ be missing.
- This is used when we have pulled a batch of events from a remote server, and
- still don't have all the prev_events.
+ This is used when we have pulled a batch of events from a remote server, and may
+ not have all the prev_events.
- If we already have all the prev_events for `event`, this method does nothing.
+ To build an EventContext, we need to calculate the state before the event. If we
+ already have all the prev_events for `event`, we can simply use the state after
+ the prev_events to calculate the state before `event`.
Otherwise, the missing prevs become new backwards extremities, and we fall back
to asking the remote server for the state after each missing `prev_event`,
@@ -816,8 +935,7 @@ class FederationEventHandler:
event: an event to check for missing prevs.
Returns:
- if we already had all the prev events, `None`. Otherwise, returns
- the event ids of the state at `event`.
+ The event context.
Raises:
FederationError if we fail to get the state from the remote server after any
@@ -831,7 +949,7 @@ class FederationEventHandler:
missing_prevs = prevs - seen
if not missing_prevs:
- return None
+ return await self._state_handler.compute_event_context(event)
logger.info(
"Event %s is missing prev_events %s: calculating state for a "
@@ -843,9 +961,15 @@ class FederationEventHandler:
# resolve them to find the correct state at the current event.
try:
+ # Determine whether we may be about to retrieve partial state
+ # Events may be un-partial stated right after we compute the partial state
+ # flag, but that's okay, as long as the flag errs on the conservative side.
+ partial_state_flags = await self._store.get_partial_state_events(seen)
+ partial_state = any(partial_state_flags.values())
+
# Get the state of the events we know about
ours = await self._state_storage_controller.get_state_groups_ids(
- room_id, seen
+ room_id, seen, await_full_state=False
)
# state_maps is a list of mappings from (type, state_key) to event_id
@@ -891,8 +1015,12 @@ class FederationEventHandler:
"We can't get valid state history.",
affected=event_id,
)
- return state_map
+ return await self._state_handler.compute_event_context(
+ event, state_ids_before_event=state_map, partial_state=partial_state
+ )
+ @trace
+ @tag_args
async def _get_state_ids_after_missing_prev_event(
self,
destination: str,
@@ -913,6 +1041,14 @@ class FederationEventHandler:
InvalidResponseError: if the remote homeserver's response contains fields
of the wrong type.
"""
+
+ # It would be better if we could query the difference from our known
+ # state to the given `event_id` so the sending server doesn't have to
+ # send as much and we don't have to process as many events. For example
+ # in a room like #matrix:matrix.org, we get 200k events (77k state_events, 122k
+ # auth_events) from this call.
+ #
+ # Tracked by https://github.com/matrix-org/synapse/issues/13618
(
state_event_ids,
auth_event_ids,
@@ -932,10 +1068,10 @@ class FederationEventHandler:
logger.debug("Fetching %i events from cache/store", len(desired_events))
have_events = await self._store.have_seen_events(room_id, desired_events)
- missing_desired_events = desired_events - have_events
+ missing_desired_event_ids = desired_events - have_events
logger.debug(
"We are missing %i events (got %i)",
- len(missing_desired_events),
+ len(missing_desired_event_ids),
len(have_events),
)
@@ -947,13 +1083,30 @@ class FederationEventHandler:
# already have a bunch of the state events. It would be nice if the
# federation api gave us a way of finding out which we actually need.
- missing_auth_events = set(auth_event_ids) - have_events
- missing_auth_events.difference_update(
- await self._store.have_seen_events(room_id, missing_auth_events)
+ missing_auth_event_ids = set(auth_event_ids) - have_events
+ missing_auth_event_ids.difference_update(
+ await self._store.have_seen_events(room_id, missing_auth_event_ids)
)
- logger.debug("We are also missing %i auth events", len(missing_auth_events))
+ logger.debug("We are also missing %i auth events", len(missing_auth_event_ids))
- missing_events = missing_desired_events | missing_auth_events
+ missing_event_ids = missing_desired_event_ids | missing_auth_event_ids
+
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "missing_auth_event_ids",
+ str(missing_auth_event_ids),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "missing_auth_event_ids.length",
+ str(len(missing_auth_event_ids)),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "missing_desired_event_ids",
+ str(missing_desired_event_ids),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "missing_desired_event_ids.length",
+ str(len(missing_desired_event_ids)),
+ )
# Making an individual request for each of 1000s of events has a lot of
# overhead. On the other hand, we don't really want to fetch all of the events
@@ -964,13 +1117,13 @@ class FederationEventHandler:
#
# TODO: might it be better to have an API which lets us do an aggregate event
# request
- if (len(missing_events) * 10) >= len(auth_event_ids) + len(state_event_ids):
+ if (len(missing_event_ids) * 10) >= len(auth_event_ids) + len(state_event_ids):
logger.debug("Requesting complete state from remote")
await self._get_state_and_persist(destination, room_id, event_id)
else:
- logger.debug("Fetching %i events from remote", len(missing_events))
+ logger.debug("Fetching %i events from remote", len(missing_event_ids))
await self._get_events_and_persist(
- destination=destination, room_id=room_id, event_ids=missing_events
+ destination=destination, room_id=room_id, event_ids=missing_event_ids
)
# We now need to fill out the state map, which involves fetching the
@@ -1018,12 +1171,23 @@ class FederationEventHandler:
# XXX: this doesn't sound right? it means that we'll end up with incomplete
# state.
failed_to_fetch = desired_events - event_metadata.keys()
+ # `event_id` could be missing from `event_metadata` because it's not necessarily
+ # a state event. We've already checked that we've fetched it above.
+ failed_to_fetch.discard(event_id)
if failed_to_fetch:
logger.warning(
"Failed to fetch missing state events for %s %s",
event_id,
failed_to_fetch,
)
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "failed_to_fetch",
+ str(failed_to_fetch),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "failed_to_fetch.length",
+ str(len(failed_to_fetch)),
+ )
if remote_event.is_state() and remote_event.rejected_reason is None:
state_map[
@@ -1032,6 +1196,8 @@ class FederationEventHandler:
return state_map
+ @trace
+ @tag_args
async def _get_state_and_persist(
self, destination: str, room_id: str, event_id: str
) -> None:
@@ -1053,11 +1219,12 @@ class FederationEventHandler:
destination=destination, room_id=room_id, event_ids=(event_id,)
)
+ @trace
async def _process_received_pdu(
self,
origin: str,
event: EventBase,
- state_ids: Optional[StateMap[str]],
+ context: EventContext,
backfilled: bool = False,
) -> None:
"""Called when we have a new non-outlier event.
@@ -1079,37 +1246,30 @@ class FederationEventHandler:
event: event to be persisted
- state_ids: Normally None, but if we are handling a gap in the graph
- (ie, we are missing one or more prev_events), the resolved state at the
- event
+ context: The `EventContext` to persist the event with.
backfilled: True if this is part of a historical batch of events (inhibits
notification to clients, and validation of device keys.)
+
+ PartialStateConflictError: if the room was un-partial stated in between
+ computing the state at the event and persisting it. The caller should
+ recompute `context` and retry exactly once when this happens.
"""
logger.debug("Processing event: %s", event)
assert not event.internal_metadata.outlier
try:
- context = await self._state_handler.compute_event_context(
- event,
- state_ids_before_event=state_ids,
- )
- context = await self._check_event_auth(
- origin,
- event,
- context,
- )
+ await self._check_event_auth(origin, event, context)
except AuthError as e:
- # FIXME richvdh 2021/10/07 I don't think this is reachable. Let's log it
- # for now
- logger.exception("Unexpected AuthError from _check_event_auth")
+ # This happens only if we couldn't find the auth events. We'll already have
+ # logged a warning, so now we just convert to a FederationError.
raise FederationError("ERROR", e.code, e.msg, affected=event.event_id)
if not backfilled and not context.rejected:
# For new (non-backfilled and non-outlier) events we check if the event
# passes auth based on the current state. If it doesn't then we
# "soft-fail" the event.
- await self._check_for_soft_fail(event, state_ids, origin=origin)
+ await self._check_for_soft_fail(event, context=context, origin=origin)
await self._run_push_actions_and_persist_event(event, context, backfilled)
@@ -1210,6 +1370,7 @@ class FederationEventHandler:
except Exception:
logger.exception("Failed to resync device for %s", sender)
+ @trace
async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None:
"""Handles backfilling the insertion event when we receive a marker
event that points to one.
@@ -1241,7 +1402,7 @@ class FederationEventHandler:
logger.debug("_handle_marker_event: received %s", marker_event)
insertion_event_id = marker_event.content.get(
- EventContentFields.MSC2716_MARKER_INSERTION
+ EventContentFields.MSC2716_INSERTION_EVENT_REFERENCE
)
if insertion_event_id is None:
@@ -1294,6 +1455,55 @@ class FederationEventHandler:
marker_event,
)
+ async def backfill_event_id(
+ self, destination: str, room_id: str, event_id: str
+ ) -> EventBase:
+ """Backfill a single event and persist it as a non-outlier which means
+ we also pull in all of the state and auth events necessary for it.
+
+ Args:
+ destination: The homeserver to pull the given event_id from.
+ room_id: The room where the event is from.
+ event_id: The event ID to backfill.
+
+ Raises:
+ FederationError if we are unable to find the event from the destination
+ """
+ logger.info(
+ "backfill_event_id: event_id=%s from destination=%s", event_id, destination
+ )
+
+ room_version = await self._store.get_room_version(room_id)
+
+ event_from_response = await self._federation_client.get_pdu(
+ [destination],
+ event_id,
+ room_version,
+ )
+
+ if not event_from_response:
+ raise FederationError(
+ "ERROR",
+ 404,
+ "Unable to find event_id=%s from destination=%s to backfill."
+ % (event_id, destination),
+ affected=event_id,
+ )
+
+ # Persist the event we just fetched, including pulling all of the state
+ # and auth events to de-outlier it. This also sets up the necessary
+ # `state_groups` for the event.
+ await self._process_pulled_events(
+ destination,
+ [event_from_response],
+ # Prevent notifications going to clients
+ backfilled=True,
+ )
+
+ return event_from_response
+
+ @trace
+ @tag_args
async def _get_events_and_persist(
self, destination: str, room_id: str, event_ids: Collection[str]
) -> None:
@@ -1339,6 +1549,7 @@ class FederationEventHandler:
logger.info("Fetched %i events of %i requested", len(events), len(event_ids))
await self._auth_and_persist_outliers(room_id, events)
+ @trace
async def _auth_and_persist_outliers(
self, room_id: str, events: Iterable[EventBase]
) -> None:
@@ -1357,6 +1568,16 @@ class FederationEventHandler:
"""
event_map = {event.event_id: event for event in events}
+ event_ids = event_map.keys()
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "event_ids",
+ str(event_ids),
+ )
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
+ str(len(event_ids)),
+ )
+
# filter out any events we have already seen. This might happen because
# the events were eagerly pushed to us (eg, during a room join), or because
# another thread has raced against us since we decided to request the event.
@@ -1428,10 +1649,9 @@ class FederationEventHandler:
allow_rejected=True,
)
- room_version = await self._store.get_room_version_id(room_id)
- room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
+ events_and_contexts_to_persist: List[Tuple[EventBase, EventContext]] = []
- def prep(event: EventBase) -> Optional[Tuple[EventBase, EventContext]]:
+ async def prep(event: EventBase) -> None:
with nested_logging_context(suffix=event.event_id):
auth = []
for auth_event_id in event.auth_event_ids():
@@ -1445,7 +1665,7 @@ class FederationEventHandler:
event,
auth_event_id,
)
- return None
+ return
auth.append(ae)
# we're not bothering about room state, so flag the event as an outlier.
@@ -1453,42 +1673,42 @@ class FederationEventHandler:
context = EventContext.for_outlier(self._storage_controllers)
try:
- validate_event_for_room_version(room_version_obj, event)
- check_auth_rules_for_event(room_version_obj, event, auth)
+ validate_event_for_room_version(event)
+ await check_state_independent_auth_rules(self._store, event)
+ check_state_dependent_auth_rules(event, auth)
except AuthError as e:
logger.warning("Rejecting %r because %s", event, e)
context.rejected = RejectedReason.AUTH_ERROR
- return event, context
+ events_and_contexts_to_persist.append((event, context))
+
+ for event in fetched_events:
+ await prep(event)
- events_to_persist = (x for x in (prep(event) for event in fetched_events) if x)
await self.persist_events_and_notify(
room_id,
- tuple(events_to_persist),
+ events_and_contexts_to_persist,
# Mark these events backfilled as they're historic events that will
# eventually be backfilled. For example, missing events we fetch
# during backfill should be marked as backfilled as well.
backfilled=True,
)
+ @trace
async def _check_event_auth(
- self,
- origin: str,
- event: EventBase,
- context: EventContext,
- ) -> EventContext:
+ self, origin: Optional[str], event: EventBase, context: EventContext
+ ) -> None:
"""
Checks whether an event should be rejected (for failing auth checks).
Args:
- origin: The host the event originates from.
+ origin: The host the event originates from. This is used to fetch
+ any missing auth events. It can be set to None, but only if we are
+ sure that we already have all the auth events.
event: The event itself.
context:
The event context.
- Returns:
- The updated context object.
-
Raises:
AuthError if we were unable to find copies of the event's auth events.
(Most other failures just cause us to set `context.rejected`.)
@@ -1497,16 +1717,13 @@ class FederationEventHandler:
assert not event.internal_metadata.outlier
# first of all, check that the event itself is valid.
- room_version = await self._store.get_room_version_id(event.room_id)
- room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
-
try:
- validate_event_for_room_version(room_version_obj, event)
+ validate_event_for_room_version(event)
except AuthError as e:
logger.warning("While validating received event %r: %s", event, e)
# TODO: use a different rejected reason here?
context.rejected = RejectedReason.AUTH_ERROR
- return context
+ return
# next, check that we have all of the event's auth events.
#
@@ -1516,66 +1733,112 @@ class FederationEventHandler:
claimed_auth_events = await self._load_or_fetch_auth_events_for_event(
origin, event
)
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "claimed_auth_events",
+ str([ev.event_id for ev in claimed_auth_events]),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "claimed_auth_events.length",
+ str(len(claimed_auth_events)),
+ )
# ... and check that the event passes auth at those auth events.
+ # https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu:
+ # 4. Passes authorization rules based on the event’s auth events,
+ # otherwise it is rejected.
try:
- check_auth_rules_for_event(room_version_obj, event, claimed_auth_events)
+ await check_state_independent_auth_rules(self._store, event)
+ check_state_dependent_auth_rules(event, claimed_auth_events)
except AuthError as e:
logger.warning(
"While checking auth of %r against auth_events: %s", event, e
)
context.rejected = RejectedReason.AUTH_ERROR
- return context
+ return
+
+ # now check the auth rules pass against the room state before the event
+ # https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu:
+ # 5. Passes authorization rules based on the state before the event,
+ # otherwise it is rejected.
+ #
+ # ... however, if we only have partial state for the room, then there is a good
+ # chance that we'll be missing some of the state needed to auth the new event.
+ # So, we state-resolve the auth events that we are given against the state that
+ # we know about, which ensures things like bans are applied. (Note that we'll
+ # already have checked we have all the auth events, in
+ # _load_or_fetch_auth_events_for_event above)
+ if context.partial_state:
+ room_version = await self._store.get_room_version_id(event.room_id)
+
+ local_state_id_map = await context.get_prev_state_ids()
+ claimed_auth_events_id_map = {
+ (ev.type, ev.state_key): ev.event_id for ev in claimed_auth_events
+ }
+
+ state_for_auth_id_map = (
+ await self._state_resolution_handler.resolve_events_with_store(
+ event.room_id,
+ room_version,
+ [local_state_id_map, claimed_auth_events_id_map],
+ event_map=None,
+ state_res_store=StateResolutionStore(self._store),
+ )
+ )
+ else:
+ event_types = event_auth.auth_types_for_event(event.room_version, event)
+ state_for_auth_id_map = await context.get_prev_state_ids(
+ StateFilter.from_types(event_types)
+ )
- # now check auth against what we think the auth events *should* be.
- event_types = event_auth.auth_types_for_event(event.room_version, event)
- prev_state_ids = await context.get_prev_state_ids(
- StateFilter.from_types(event_types)
+ calculated_auth_event_ids = self._event_auth_handler.compute_auth_events(
+ event, state_for_auth_id_map, for_verification=True
)
- auth_events_ids = self._event_auth_handler.compute_auth_events(
- event, prev_state_ids, for_verification=True
+ # if those are the same, we're done here.
+ if collections.Counter(event.auth_event_ids()) == collections.Counter(
+ calculated_auth_event_ids
+ ):
+ return
+
+ # otherwise, re-run the auth checks based on what we calculated.
+ calculated_auth_events = await self._store.get_events_as_list(
+ calculated_auth_event_ids
)
- auth_events_x = await self._store.get_events(auth_events_ids)
+
+ # log the differences
+
+ claimed_auth_event_map = {(e.type, e.state_key): e for e in claimed_auth_events}
calculated_auth_event_map = {
- (e.type, e.state_key): e for e in auth_events_x.values()
+ (e.type, e.state_key): e for e in calculated_auth_events
}
+ logger.info(
+ "event's auth_events are different to our calculated auth_events. "
+ "Claimed but not calculated: %s. Calculated but not claimed: %s",
+ [
+ ev
+ for k, ev in claimed_auth_event_map.items()
+ if k not in calculated_auth_event_map
+ or calculated_auth_event_map[k].event_id != ev.event_id
+ ],
+ [
+ ev
+ for k, ev in calculated_auth_event_map.items()
+ if k not in claimed_auth_event_map
+ or claimed_auth_event_map[k].event_id != ev.event_id
+ ],
+ )
try:
- updated_auth_events = await self._update_auth_events_for_auth(
+ check_state_dependent_auth_rules(event, calculated_auth_events)
+ except AuthError as e:
+ logger.warning(
+ "While checking auth of %r against room state before the event: %s",
event,
- calculated_auth_event_map=calculated_auth_event_map,
- )
- except Exception:
- # We don't really mind if the above fails, so lets not fail
- # processing if it does. However, it really shouldn't fail so
- # let's still log as an exception since we'll still want to fix
- # any bugs.
- logger.exception(
- "Failed to double check auth events for %s with remote. "
- "Ignoring failure and continuing processing of event.",
- event.event_id,
- )
- updated_auth_events = None
-
- if updated_auth_events:
- context = await self._update_context_for_auth_events(
- event, context, updated_auth_events
- )
- auth_events_for_auth = updated_auth_events
- else:
- auth_events_for_auth = calculated_auth_event_map
-
- try:
- check_auth_rules_for_event(
- room_version_obj, event, auth_events_for_auth.values()
+ e,
)
- except AuthError as e:
- logger.warning("Failed auth resolution for %r because %s", event, e)
context.rejected = RejectedReason.AUTH_ERROR
- return context
-
+ @trace
async def _maybe_kick_guest_users(self, event: EventBase) -> None:
if event.type != EventTypes.GuestAccess:
return
@@ -1593,17 +1856,27 @@ class FederationEventHandler:
async def _check_for_soft_fail(
self,
event: EventBase,
- state_ids: Optional[StateMap[str]],
+ context: EventContext,
origin: str,
) -> None:
"""Checks if we should soft fail the event; if so, marks the event as
such.
+ Does nothing for events in rooms with partial state, since we may not have an
+ accurate membership event for the sender in the current state.
+
Args:
event
- state_ids: The state at the event if we don't have all the event's prev events
+ context: The `EventContext` which we are about to persist the event with.
origin: The host the event originates from.
"""
+ if await self._store.is_partial_state_room(event.room_id):
+ # We might not know the sender's membership in the current state, so don't
+ # soft fail anything. Even if we do have a membership for the sender in the
+ # current state, it may have been derived from state resolution between
+ # partial and full state and may not be accurate.
+ return
+
extrem_ids_list = await self._store.get_latest_event_ids_in_room(event.room_id)
extrem_ids = set(extrem_ids_list)
prev_event_ids = set(event.prev_event_ids())
@@ -1620,11 +1893,15 @@ class FederationEventHandler:
auth_types = auth_types_for_event(room_version_obj, event)
# Calculate the "current state".
- if state_ids is not None:
- # If we're explicitly given the state then we won't have all the
- # prev events, and so we have a gap in the graph. In this case
- # we want to be a little careful as we might have been down for
- # a while and have an incorrect view of the current state,
+ seen_event_ids = await self._store.have_events_in_timeline(prev_event_ids)
+ has_missing_prevs = bool(prev_event_ids - seen_event_ids)
+ if has_missing_prevs:
+ # We don't have all the prev_events of this event, which means we have a
+ # gap in the graph, and the new event is going to become a new backwards
+ # extremity.
+ #
+ # In this case we want to be a little careful as we might have been
+ # down for a while and have an incorrect view of the current state,
# however we still want to do checks as gaps are easy to
# maliciously manufacture.
#
@@ -1637,6 +1914,7 @@ class FederationEventHandler:
event.room_id, extrem_ids
)
state_sets: List[StateMap[str]] = list(state_sets_d.values())
+ state_ids = await context.get_prev_state_ids()
state_sets.append(state_ids)
current_state_ids = (
await self._state_resolution_handler.resolve_events_with_store(
@@ -1669,7 +1947,7 @@ class FederationEventHandler:
)
try:
- check_auth_rules_for_event(room_version_obj, event, current_auth_events)
+ check_state_dependent_auth_rules(event, current_auth_events)
except AuthError as e:
logger.warning(
"Soft-failing %r (from %s) because %s",
@@ -1685,95 +1963,8 @@ class FederationEventHandler:
soft_failed_event_counter.inc()
event.internal_metadata.soft_failed = True
- async def _update_auth_events_for_auth(
- self,
- event: EventBase,
- calculated_auth_event_map: StateMap[EventBase],
- ) -> Optional[StateMap[EventBase]]:
- """Helper for _check_event_auth. See there for docs.
-
- Checks whether a given event has the expected auth events. If it
- doesn't then we talk to the remote server to compare state to see if
- we can come to a consensus (e.g. if one server missed some valid
- state).
-
- This attempts to resolve any potential divergence of state between
- servers, but is not essential and so failures should not block further
- processing of the event.
-
- Args:
- event:
-
- calculated_auth_event_map:
- Our calculated auth_events based on the state of the room
- at the event's position in the DAG.
-
- Returns:
- updated auth event map, or None if no changes are needed.
-
- """
- assert not event.internal_metadata.outlier
-
- # check for events which are in the event's claimed auth_events, but not
- # in our calculated event map.
- event_auth_events = set(event.auth_event_ids())
- different_auth = event_auth_events.difference(
- e.event_id for e in calculated_auth_event_map.values()
- )
-
- if not different_auth:
- return None
-
- logger.info(
- "auth_events refers to events which are not in our calculated auth "
- "chain: %s",
- different_auth,
- )
-
- # XXX: currently this checks for redactions but I'm not convinced that is
- # necessary?
- different_events = await self._store.get_events_as_list(different_auth)
-
- # double-check they're all in the same room - we should already have checked
- # this but it doesn't hurt to check again.
- for d in different_events:
- assert (
- d.room_id == event.room_id
- ), f"Event {event.event_id} refers to auth_event {d.event_id} which is in a different room"
-
- # now we state-resolve between our own idea of the auth events, and the remote's
- # idea of them.
-
- local_state = calculated_auth_event_map.values()
- remote_auth_events = dict(calculated_auth_event_map)
- remote_auth_events.update({(d.type, d.state_key): d for d in different_events})
- remote_state = remote_auth_events.values()
-
- room_version = await self._store.get_room_version_id(event.room_id)
- new_state = await self._state_handler.resolve_events(
- room_version, (local_state, remote_state), event
- )
- different_state = {
- (d.type, d.state_key): d
- for d in new_state.values()
- if calculated_auth_event_map.get((d.type, d.state_key)) != d
- }
- if not different_state:
- logger.info("State res returned no new state")
- return None
-
- logger.info(
- "After state res: updating auth_events with new state %s",
- different_state.values(),
- )
-
- # take a copy of calculated_auth_event_map before we modify it.
- auth_events = dict(calculated_auth_event_map)
- auth_events.update(different_state)
- return auth_events
-
async def _load_or_fetch_auth_events_for_event(
- self, destination: str, event: EventBase
+ self, destination: Optional[str], event: EventBase
) -> Collection[EventBase]:
"""Fetch this event's auth_events, from database or remote
@@ -1789,12 +1980,19 @@ class FederationEventHandler:
Args:
destination: where to send the /event_auth request. Typically the server
that sent us `event` in the first place.
+
+ If this is None, no attempt is made to load any missing auth events:
+ rather, an AssertionError is raised if there are any missing events.
+
event: the event whose auth_events we want
Returns:
all of the events listed in `event.auth_events_ids`, after deduplication
Raises:
+ AssertionError if some auth events were missing and no `destination` was
+ supplied.
+
AuthError if we were unable to fetch the auth_events for any reason.
"""
event_auth_event_ids = set(event.auth_event_ids())
@@ -1806,6 +2004,13 @@ class FederationEventHandler:
)
if not missing_auth_event_ids:
return event_auth_events.values()
+ if destination is None:
+ # this shouldn't happen: destination must be set unless we know we have already
+ # persisted the auth events.
+ raise AssertionError(
+ "_load_or_fetch_auth_events_for_event() called with no destination for "
+ "an event with missing auth_events"
+ )
logger.info(
"Event %s refers to unknown auth events %s: fetching auth chain",
@@ -1841,6 +2046,8 @@ class FederationEventHandler:
# instead we raise an AuthError, which will make the caller ignore it.
raise AuthError(code=HTTPStatus.FORBIDDEN, msg="Auth events could not be found")
+ @trace
+ @tag_args
async def _get_remote_auth_chain_for_event(
self, destination: str, room_id: str, event_id: str
) -> None:
@@ -1869,61 +2076,7 @@ class FederationEventHandler:
await self._auth_and_persist_outliers(room_id, remote_auth_events)
- async def _update_context_for_auth_events(
- self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase]
- ) -> EventContext:
- """Update the state_ids in an event context after auth event resolution,
- storing the changes as a new state group.
-
- Args:
- event: The event we're handling the context for
-
- context: initial event context
-
- auth_events: Events to update in the event context.
-
- Returns:
- new event context
- """
- # exclude the state key of the new event from the current_state in the context.
- if event.is_state():
- event_key: Optional[Tuple[str, str]] = (event.type, event.state_key)
- else:
- event_key = None
- state_updates = {
- k: a.event_id for k, a in auth_events.items() if k != event_key
- }
-
- current_state_ids = await context.get_current_state_ids()
- current_state_ids = dict(current_state_ids) # type: ignore
-
- current_state_ids.update(state_updates)
-
- prev_state_ids = await context.get_prev_state_ids()
- prev_state_ids = dict(prev_state_ids)
-
- prev_state_ids.update({k: a.event_id for k, a in auth_events.items()})
-
- # create a new state group as a delta from the existing one.
- prev_group = context.state_group
- state_group = await self._state_storage_controller.store_state_group(
- event.event_id,
- event.room_id,
- prev_group=prev_group,
- delta_ids=state_updates,
- current_state_ids=current_state_ids,
- )
-
- return EventContext.with_state(
- storage=self._storage_controllers,
- state_group=state_group,
- state_group_before_event=context.state_group_before_event,
- state_delta_due_to_event=state_updates,
- prev_group=prev_group,
- delta_ids=state_updates,
- partial_state=context.partial_state,
- )
-
+ @trace
async def _run_push_actions_and_persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
) -> None:
@@ -1933,6 +2086,9 @@ class FederationEventHandler:
event: The event itself.
context: The event context.
backfilled: True if the event was backfilled.
+
+ PartialStateConflictError: if attempting to persist a partial state event in
+ a room that has been un-partial stated.
"""
# this method should not be called on outliers (those code paths call
# persist_events_and_notify directly.)
@@ -1963,9 +2119,7 @@ class FederationEventHandler:
event.room_id, [(event, context)], backfilled=backfilled
)
except Exception:
- run_in_background(
- self._store.remove_push_actions_from_staging, event.event_id
- )
+ await self._store.remove_push_actions_from_staging(event.event_id)
raise
async def persist_events_and_notify(
@@ -1987,6 +2141,10 @@ class FederationEventHandler:
Returns:
The stream ID after which all events have been persisted.
+
+ Raises:
+ PartialStateConflictError: if attempting to persist a partial state event in
+ a room that has been un-partial stated.
"""
if not event_and_contexts:
return self._store.get_room_max_stream_ordering()
@@ -1995,14 +2153,19 @@ class FederationEventHandler:
if instance != self._instance_name:
# Limit the number of events sent over replication. We choose 200
# here as that is what we default to in `max_request_body_size(..)`
- for batch in batch_iter(event_and_contexts, 200):
- result = await self._send_events(
- instance_name=instance,
- store=self._store,
- room_id=room_id,
- event_and_contexts=batch,
- backfilled=backfilled,
- )
+ try:
+ for batch in batch_iter(event_and_contexts, 200):
+ result = await self._send_events(
+ instance_name=instance,
+ store=self._store,
+ room_id=room_id,
+ event_and_contexts=batch,
+ backfilled=backfilled,
+ )
+ except SynapseError as e:
+ if e.code == HTTPStatus.CONFLICT:
+ raise PartialStateConflictError()
+ raise
return result["max_stream_id"]
else:
assert self._storage_controllers.persistence
@@ -2022,8 +2185,17 @@ class FederationEventHandler:
self._message_handler.maybe_schedule_expiry(event)
if not backfilled: # Never notify for backfilled events
- for event in events:
- await self._notify_persisted_event(event, max_stream_token)
+ with start_active_span("notify_persisted_events"):
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "event_ids",
+ str([ev.event_id for ev in events]),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "event_ids.length",
+ str(len(events)),
+ )
+ for event in events:
+ await self._notify_persisted_event(event, max_stream_token)
return max_stream_token.stream
@@ -2064,6 +2236,10 @@ class FederationEventHandler:
event, event_pos, max_stream_token, extra_users=extra_users
)
+ if event.type == EventTypes.Member and event.membership == Membership.JOIN:
+ # TODO retrieve the previous state, and exclude join -> join transitions
+ self._notifier.notify_user_joined_room(event.event_id, event.room_id)
+
def _sanity_check_event(self, ev: EventBase) -> None:
"""
Do some early sanity checks of a received event
|