diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 161b3c933c..34725324a6 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -627,23 +627,28 @@ class AuthHandler(BaseHandler):
async def add_oob_auth(
self, stagetype: str, authdict: Dict[str, Any], clientip: str
- ) -> bool:
+ ) -> None:
"""
Adds the result of out-of-band authentication into an existing auth
session. Currently used for adding the result of fallback auth.
+
+ Raises:
+ LoginError if the stagetype is unknown or the session is missing.
+ LoginError is raised by check_auth if authentication fails.
"""
if stagetype not in self.checkers:
- raise LoginError(400, "", Codes.MISSING_PARAM)
+ raise LoginError(
+ 400, f"Unknown UIA stage type: {stagetype}", Codes.INVALID_PARAM
+ )
if "session" not in authdict:
- raise LoginError(400, "", Codes.MISSING_PARAM)
+ raise LoginError(400, "Missing session ID", Codes.MISSING_PARAM)
+ # If authentication fails a LoginError is raised. Otherwise, store
+ # the successful result.
result = await self.checkers[stagetype].check_auth(authdict, clientip)
- if result:
- await self.store.mark_ui_auth_stage_complete(
- authdict["session"], stagetype, result
- )
- return True
- return False
+ await self.store.mark_ui_auth_stage_complete(
+ authdict["session"], stagetype, result
+ )
def get_session_id(self, clientdict: Dict[str, Any]) -> Optional[str]:
"""
@@ -1459,6 +1464,10 @@ class AuthHandler(BaseHandler):
)
await self.store.user_delete_threepid(user_id, medium, address)
+ if medium == "email":
+ await self.store.delete_pusher_by_app_id_pushkey_user_id(
+ app_id="m.email", pushkey=address, user_id=user_id
+ )
return result
async def hash(self, password: str) -> str:
@@ -1727,7 +1736,6 @@ class AuthHandler(BaseHandler):
@attr.s(slots=True)
class MacaroonGenerator:
-
hs = attr.ib()
def generate_guest_access_token(self, user_id: str) -> str:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index c0e13bdaac..daf1d3bfb3 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -17,23 +17,9 @@
import itertools
import logging
-from collections.abc import Container
from http import HTTPStatus
-from typing import (
- TYPE_CHECKING,
- Collection,
- Dict,
- Iterable,
- List,
- Optional,
- Sequence,
- Set,
- Tuple,
- Union,
-)
+from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union
-import attr
-from prometheus_client import Counter
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
from unpaddedbase64 import decode_base64
@@ -41,19 +27,12 @@ from unpaddedbase64 import decode_base64
from twisted.internet import defer
from synapse import event_auth
-from synapse.api.constants import (
- EventContentFields,
- EventTypes,
- Membership,
- RejectedReason,
- RoomEncryptionAlgorithms,
-)
+from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.api.errors import (
AuthError,
CodeMessageException,
Codes,
FederationDeniedError,
- FederationError,
HttpResponseException,
NotFoundError,
RequestSendFailed,
@@ -61,10 +40,10 @@ from synapse.api.errors import (
)
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion, RoomVersions
from synapse.crypto.event_signing import compute_event_signature
-from synapse.event_auth import auth_types_for_event
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.events.validator import EventValidator
+from synapse.federation.federation_client import InvalidResponseError
from synapse.handlers._base import BaseHandler
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.context import (
@@ -74,28 +53,14 @@ from synapse.logging.context import (
run_in_background,
)
from synapse.logging.utils import log_function
-from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.replication.http.federation import (
ReplicationCleanRoomRestServlet,
- ReplicationFederationSendEventsRestServlet,
ReplicationStoreRoomOnOutlierMembershipRestServlet,
)
-from synapse.state import StateResolutionStore
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
-from synapse.types import (
- JsonDict,
- MutableStateMap,
- PersistedEventPosition,
- RoomStreamToken,
- StateMap,
- UserID,
- get_domain_from_id,
-)
-from synapse.util.async_helpers import Linearizer, concurrently_execute
-from synapse.util.iterutils import batch_iter
+from synapse.types import JsonDict, StateMap, get_domain_from_id
+from synapse.util.async_helpers import Linearizer
from synapse.util.retryutils import NotRetryingDestination
-from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_server
if TYPE_CHECKING:
@@ -103,50 +68,11 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
-soft_failed_event_counter = Counter(
- "synapse_federation_soft_failed_events_total",
- "Events received over federation that we marked as soft_failed",
-)
-
-
-@attr.s(slots=True, frozen=True, auto_attribs=True)
-class _NewEventInfo:
- """Holds information about a received event, ready for passing to _auth_and_persist_events
-
- Attributes:
- event: the received event
-
- state: the state at that event, according to /state_ids from a remote
- homeserver. Only populated for backfilled events which are going to be a
- new backwards extremity.
-
- claimed_auth_event_map: a map of (type, state_key) => event for the event's
- claimed auth_events.
-
- This can include events which have not yet been persisted, in the case that
- we are backfilling a batch of events.
-
- Note: May be incomplete: if we were unable to find all of the claimed auth
- events. Also, treat the contents with caution: the events might also have
- been rejected, might not yet have been authorized themselves, or they might
- be in the wrong room.
-
- """
-
- event: EventBase
- state: Optional[Sequence[EventBase]]
- claimed_auth_event_map: StateMap[EventBase]
-
class FederationHandler(BaseHandler):
- """Handles events that originated from federation.
- Responsible for:
- a) handling received Pdus before handing them on as Events to the rest
- of the homeserver (including auth and state conflict resolutions)
- b) converting events that were produced by local clients that may need
- to be sent to remote homeservers.
- c) doing the necessary dances to invite remote users and join remote
- rooms.
+ """Handles general incoming federation requests
+
+ Incoming events are *not* handled here, for which see FederationEventHandler.
"""
def __init__(self, hs: "HomeServer"):
@@ -159,979 +85,35 @@ class FederationHandler(BaseHandler):
self.state_store = self.storage.state
self.federation_client = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
- self._state_resolution_handler = hs.get_state_resolution_handler()
self.server_name = hs.hostname
self.keyring = hs.get_keyring()
- self.action_generator = hs.get_action_generator()
self.is_mine_id = hs.is_mine_id
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
self._event_auth_handler = hs.get_event_auth_handler()
- self._message_handler = hs.get_message_handler()
self._server_notices_mxid = hs.config.server_notices_mxid
self.config = hs.config
self.http_client = hs.get_proxied_blacklisted_http_client()
- self._instance_name = hs.get_instance_name()
self._replication = hs.get_replication_data_handler()
+ self._federation_event_handler = hs.get_federation_event_handler()
- self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
hs
)
if hs.config.worker_app:
- self._user_device_resync = (
- ReplicationUserDevicesResyncRestServlet.make_client(hs)
- )
self._maybe_store_room_on_outlier_membership = (
ReplicationStoreRoomOnOutlierMembershipRestServlet.make_client(hs)
)
else:
- self._device_list_updater = hs.get_device_handler().device_list_updater
self._maybe_store_room_on_outlier_membership = (
self.store.maybe_store_room_on_outlier_membership
)
- # When joining a room we need to queue any events for that room up.
- # For each room, a list of (pdu, origin) tuples.
- self.room_queues: Dict[str, List[Tuple[EventBase, str]]] = {}
- self._room_pdu_linearizer = Linearizer("fed_room_pdu")
-
self._room_backfill = Linearizer("room_backfill")
self.third_party_event_rules = hs.get_third_party_event_rules()
- self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
-
- async def on_receive_pdu(
- self, origin: str, pdu: EventBase, sent_to_us_directly: bool = False
- ) -> None:
- """Process a PDU received via a federation /send/ transaction, or
- via backfill of missing prev_events
-
- Args:
- origin: server which initiated the /send/ transaction. Will
- be used to fetch missing events or state.
- pdu: received PDU
- sent_to_us_directly: True if this event was pushed to us; False if
- we pulled it as the result of a missing prev_event.
- """
-
- room_id = pdu.room_id
- event_id = pdu.event_id
-
- # We reprocess pdus when we have seen them only as outliers
- existing = await self.store.get_event(
- event_id, allow_none=True, allow_rejected=True
- )
-
- # FIXME: Currently we fetch an event again when we already have it
- # if it has been marked as an outlier.
- if existing:
- if not existing.internal_metadata.is_outlier():
- logger.info(
- "Ignoring received event %s which we have already seen", event_id
- )
- return
- if pdu.internal_metadata.is_outlier():
- logger.info(
- "Ignoring received outlier %s which we already have as an outlier",
- event_id,
- )
- return
- logger.info("De-outliering event %s", event_id)
-
- # do some initial sanity-checking of the event. In particular, make
- # sure it doesn't have hundreds of prev_events or auth_events, which
- # could cause a huge state resolution or cascade of event fetches.
- try:
- self._sanity_check_event(pdu)
- except SynapseError as err:
- logger.warning("Received event failed sanity checks")
- raise FederationError("ERROR", err.code, err.msg, affected=pdu.event_id)
-
- # If we are currently in the process of joining this room, then we
- # queue up events for later processing.
- if room_id in self.room_queues:
- logger.info(
- "Queuing PDU from %s for now: join in progress",
- origin,
- )
- self.room_queues[room_id].append((pdu, origin))
- return
-
- # If we're not in the room just ditch the event entirely. This is
- # probably an old server that has come back and thinks we're still in
- # the room (or we've been rejoined to the room by a state reset).
- #
- # Note that if we were never in the room then we would have already
- # dropped the event, since we wouldn't know the room version.
- is_in_room = await self._event_auth_handler.check_host_in_room(
- room_id, self.server_name
- )
- if not is_in_room:
- logger.info(
- "Ignoring PDU from %s as we're not in the room",
- origin,
- )
- return None
-
- state = None
-
- # Check that the event passes auth based on the state at the event. This is
- # done for events that are to be added to the timeline (non-outliers).
- #
- # Get missing pdus if necessary:
- # - Fetching any missing prev events to fill in gaps in the graph
- # - Fetching state if we have a hole in the graph
- if not pdu.internal_metadata.is_outlier():
- # We only backfill backwards to the min depth.
- min_depth = await self.get_min_depth_for_context(pdu.room_id)
-
- logger.debug("min_depth: %d", min_depth)
-
- prevs = set(pdu.prev_event_ids())
- seen = await self.store.have_events_in_timeline(prevs)
-
- if min_depth is not None and pdu.depth < min_depth:
- # This is so that we don't notify the user about this
- # message, to work around the fact that some events will
- # reference really really old events we really don't want to
- # send to the clients.
- pdu.internal_metadata.outlier = True
- elif min_depth is not None and pdu.depth > min_depth:
- missing_prevs = prevs - seen
- if sent_to_us_directly and missing_prevs:
- # If we're missing stuff, ensure we only fetch stuff one
- # at a time.
- logger.info(
- "Acquiring room lock to fetch %d missing prev_events: %s",
- len(missing_prevs),
- shortstr(missing_prevs),
- )
- with (await self._room_pdu_linearizer.queue(pdu.room_id)):
- logger.info(
- "Acquired room lock to fetch %d missing prev_events",
- len(missing_prevs),
- )
-
- try:
- await self._get_missing_events_for_pdu(
- origin, pdu, prevs, min_depth
- )
- except Exception as e:
- raise Exception(
- "Error fetching missing prev_events for %s: %s"
- % (event_id, e)
- ) from e
-
- # Update the set of things we've seen after trying to
- # fetch the missing stuff
- seen = await self.store.have_events_in_timeline(prevs)
-
- if not prevs - seen:
- logger.info(
- "Found all missing prev_events",
- )
-
- missing_prevs = prevs - seen
- if missing_prevs:
- # We've still not been able to get all of the prev_events for this event.
- #
- # In this case, we need to fall back to asking another server in the
- # federation for the state at this event. That's ok provided we then
- # resolve the state against other bits of the DAG before using it (which
- # will ensure that you can't just take over a room by sending an event,
- # withholding its prev_events, and declaring yourself to be an admin in
- # the subsequent state request).
- #
- # Now, if we're pulling this event as a missing prev_event, then clearly
- # this event is not going to become the only forward-extremity and we are
- # guaranteed to resolve its state against our existing forward
- # extremities, so that should be fine.
- #
- # On the other hand, if this event was pushed to us, it is possible for
- # it to become the only forward-extremity in the room, and we would then
- # trust its state to be the state for the whole room. This is very bad.
- # Further, if the event was pushed to us, there is no excuse for us not to
- # have all the prev_events. We therefore reject any such events.
- #
- # XXX this really feels like it could/should be merged with the above,
- # but there is an interaction with min_depth that I'm not really
- # following.
-
- if sent_to_us_directly:
- logger.warning(
- "Rejecting: failed to fetch %d prev events: %s",
- len(missing_prevs),
- shortstr(missing_prevs),
- )
- raise FederationError(
- "ERROR",
- 403,
- (
- "Your server isn't divulging details about prev_events "
- "referenced in this event."
- ),
- affected=pdu.event_id,
- )
-
- logger.info(
- "Event %s is missing prev_events %s: calculating state for a "
- "backwards extremity",
- event_id,
- shortstr(missing_prevs),
- )
-
- # Calculate the state after each of the previous events, and
- # resolve them to find the correct state at the current event.
- event_map = {event_id: pdu}
- try:
- # Get the state of the events we know about
- ours = await self.state_store.get_state_groups_ids(room_id, seen)
-
- # state_maps is a list of mappings from (type, state_key) to event_id
- state_maps: List[StateMap[str]] = list(ours.values())
-
- # we don't need this any more, let's delete it.
- del ours
-
- # Ask the remote server for the states we don't
- # know about
- for p in missing_prevs:
- logger.info("Requesting state after missing prev_event %s", p)
-
- with nested_logging_context(p):
- # note that if any of the missing prevs share missing state or
- # auth events, the requests to fetch those events are deduped
- # by the get_pdu_cache in federation_client.
- remote_state = (
- await self._get_state_after_missing_prev_event(
- origin, room_id, p
- )
- )
-
- remote_state_map = {
- (x.type, x.state_key): x.event_id for x in remote_state
- }
- state_maps.append(remote_state_map)
-
- for x in remote_state:
- event_map[x.event_id] = x
-
- room_version = await self.store.get_room_version_id(room_id)
- state_map = (
- await self._state_resolution_handler.resolve_events_with_store(
- room_id,
- room_version,
- state_maps,
- event_map,
- state_res_store=StateResolutionStore(self.store),
- )
- )
-
- # We need to give _process_received_pdu the actual state events
- # rather than event ids, so generate that now.
-
- # First though we need to fetch all the events that are in
- # state_map, so we can build up the state below.
- evs = await self.store.get_events(
- list(state_map.values()),
- get_prev_content=False,
- redact_behaviour=EventRedactBehaviour.AS_IS,
- )
- event_map.update(evs)
-
- state = [event_map[e] for e in state_map.values()]
- except Exception:
- logger.warning(
- "Error attempting to resolve state at missing " "prev_events",
- exc_info=True,
- )
- raise FederationError(
- "ERROR",
- 403,
- "We can't get valid state history.",
- affected=event_id,
- )
-
- # A second round of checks for all events. Check that the event passes auth
- # based on `auth_events`, this allows us to assert that the event would
- # have been allowed at some point. If an event passes this check its OK
- # for it to be used as part of a returned `/state` request, as either
- # a) we received the event as part of the original join and so trust it, or
- # b) we'll do a state resolution with existing state before it becomes
- # part of the "current state", which adds more protection.
- await self._process_received_pdu(origin, pdu, state=state)
-
- async def _get_missing_events_for_pdu(
- self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
- ) -> None:
- """
- Args:
- origin: Origin of the pdu. Will be called to get the missing events
- pdu: received pdu
- prevs: List of event ids which we are missing
- min_depth: Minimum depth of events to return.
- """
-
- room_id = pdu.room_id
- event_id = pdu.event_id
-
- seen = await self.store.have_events_in_timeline(prevs)
-
- if not prevs - seen:
- return
-
- latest_list = await self.store.get_latest_event_ids_in_room(room_id)
-
- # We add the prev events that we have seen to the latest
- # list to ensure the remote server doesn't give them to us
- latest = set(latest_list)
- latest |= seen
-
- logger.info(
- "Requesting missing events between %s and %s",
- shortstr(latest),
- event_id,
- )
-
- # XXX: we set timeout to 10s to help workaround
- # https://github.com/matrix-org/synapse/issues/1733.
- # The reason is to avoid holding the linearizer lock
- # whilst processing inbound /send transactions, causing
- # FDs to stack up and block other inbound transactions
- # which empirically can currently take up to 30 minutes.
- #
- # N.B. this explicitly disables retry attempts.
- #
- # N.B. this also increases our chances of falling back to
- # fetching fresh state for the room if the missing event
- # can't be found, which slightly reduces our security.
- # it may also increase our DAG extremity count for the room,
- # causing additional state resolution? See #1760.
- # However, fetching state doesn't hold the linearizer lock
- # apparently.
- #
- # see https://github.com/matrix-org/synapse/pull/1744
- #
- # ----
- #
- # Update richvdh 2018/09/18: There are a number of problems with timing this
- # request out aggressively on the client side:
- #
- # - it plays badly with the server-side rate-limiter, which starts tarpitting you
- # if you send too many requests at once, so you end up with the server carefully
- # working through the backlog of your requests, which you have already timed
- # out.
- #
- # - for this request in particular, we now (as of
- # https://github.com/matrix-org/synapse/pull/3456) reject any PDUs where the
- # server can't produce a plausible-looking set of prev_events - so we becone
- # much more likely to reject the event.
- #
- # - contrary to what it says above, we do *not* fall back to fetching fresh state
- # for the room if get_missing_events times out. Rather, we give up processing
- # the PDU whose prevs we are missing, which then makes it much more likely that
- # we'll end up back here for the *next* PDU in the list, which exacerbates the
- # problem.
- #
- # - the aggressive 10s timeout was introduced to deal with incoming federation
- # requests taking 8 hours to process. It's not entirely clear why that was going
- # on; certainly there were other issues causing traffic storms which are now
- # resolved, and I think in any case we may be more sensible about our locking
- # now. We're *certainly* more sensible about our logging.
- #
- # All that said: Let's try increasing the timeout to 60s and see what happens.
-
- try:
- missing_events = await self.federation_client.get_missing_events(
- origin,
- room_id,
- earliest_events_ids=list(latest),
- latest_events=[pdu],
- limit=10,
- min_depth=min_depth,
- timeout=60000,
- )
- except (RequestSendFailed, HttpResponseException, NotRetryingDestination) as e:
- # We failed to get the missing events, but since we need to handle
- # the case of `get_missing_events` not returning the necessary
- # events anyway, it is safe to simply log the error and continue.
- logger.warning("Failed to get prev_events: %s", e)
- return
-
- logger.info("Got %d prev_events", len(missing_events))
-
- # We want to sort these by depth so we process them and
- # tell clients about them in order.
- missing_events.sort(key=lambda x: x.depth)
-
- for ev in missing_events:
- logger.info("Handling received prev_event %s", ev)
- with nested_logging_context(ev.event_id):
- try:
- await self.on_receive_pdu(origin, ev, sent_to_us_directly=False)
- except FederationError as e:
- if e.code == 403:
- logger.warning(
- "Received prev_event %s failed history check.",
- ev.event_id,
- )
- else:
- raise
-
- async def _get_state_for_room(
- self,
- destination: str,
- room_id: str,
- event_id: str,
- ) -> List[EventBase]:
- """Requests all of the room state at a given event from a remote
- homeserver.
-
- Will also fetch any missing events reported in the `auth_chain_ids`
- section of `/state_ids`.
-
- Args:
- destination: The remote homeserver to query for the state.
- room_id: The id of the room we're interested in.
- event_id: The id of the event we want the state at.
-
- Returns:
- A list of events in the state, not including the event itself.
- """
- (
- state_event_ids,
- auth_event_ids,
- ) = await self.federation_client.get_room_state_ids(
- destination, room_id, event_id=event_id
- )
-
- # Fetch the state events from the DB, and check we have the auth events.
- event_map = await self.store.get_events(state_event_ids, allow_rejected=True)
- auth_events_in_store = await self.store.have_seen_events(
- room_id, auth_event_ids
- )
-
- # Check for missing events. We handle state and auth event seperately,
- # as we want to pull the state from the DB, but we don't for the auth
- # events. (Note: we likely won't use the majority of the auth chain, and
- # it can be *huge* for large rooms, so it's worth ensuring that we don't
- # unnecessarily pull it from the DB).
- missing_state_events = set(state_event_ids) - set(event_map)
- missing_auth_events = set(auth_event_ids) - set(auth_events_in_store)
- if missing_state_events or missing_auth_events:
- await self._get_events_and_persist(
- destination=destination,
- room_id=room_id,
- events=missing_state_events | missing_auth_events,
- )
-
- if missing_state_events:
- new_events = await self.store.get_events(
- missing_state_events, allow_rejected=True
- )
- event_map.update(new_events)
-
- missing_state_events.difference_update(new_events)
-
- if missing_state_events:
- logger.warning(
- "Failed to fetch missing state events for %s %s",
- event_id,
- missing_state_events,
- )
-
- if missing_auth_events:
- auth_events_in_store = await self.store.have_seen_events(
- room_id, missing_auth_events
- )
- missing_auth_events.difference_update(auth_events_in_store)
-
- if missing_auth_events:
- logger.warning(
- "Failed to fetch missing auth events for %s %s",
- event_id,
- missing_auth_events,
- )
-
- remote_state = list(event_map.values())
-
- # check for events which were in the wrong room.
- #
- # this can happen if a remote server claims that the state or
- # auth_events at an event in room A are actually events in room B
-
- bad_events = [
- (event.event_id, event.room_id)
- for event in remote_state
- if event.room_id != room_id
- ]
-
- for bad_event_id, bad_room_id in bad_events:
- # This is a bogus situation, but since we may only discover it a long time
- # after it happened, we try our best to carry on, by just omitting the
- # bad events from the returned auth/state set.
- logger.warning(
- "Remote server %s claims event %s in room %s is an auth/state "
- "event in room %s",
- destination,
- bad_event_id,
- bad_room_id,
- room_id,
- )
-
- if bad_events:
- remote_state = [e for e in remote_state if e.room_id == room_id]
-
- return remote_state
-
- async def _get_state_after_missing_prev_event(
- self,
- destination: str,
- room_id: str,
- event_id: str,
- ) -> List[EventBase]:
- """Requests all of the room state at a given event from a remote homeserver.
-
- Args:
- destination: The remote homeserver to query for the state.
- room_id: The id of the room we're interested in.
- event_id: The id of the event we want the state at.
-
- Returns:
- A list of events in the state, including the event itself
- """
- # TODO: This function is basically the same as _get_state_for_room. Can
- # we make backfill() use it, rather than having two code paths? I think the
- # only difference is that backfill() persists the prev events separately.
-
- (
- state_event_ids,
- auth_event_ids,
- ) = await self.federation_client.get_room_state_ids(
- destination, room_id, event_id=event_id
- )
-
- logger.debug(
- "state_ids returned %i state events, %i auth events",
- len(state_event_ids),
- len(auth_event_ids),
- )
-
- # start by just trying to fetch the events from the store
- desired_events = set(state_event_ids)
- desired_events.add(event_id)
- logger.debug("Fetching %i events from cache/store", len(desired_events))
- fetched_events = await self.store.get_events(
- desired_events, allow_rejected=True
- )
-
- missing_desired_events = desired_events - fetched_events.keys()
- logger.debug(
- "We are missing %i events (got %i)",
- len(missing_desired_events),
- len(fetched_events),
- )
-
- # We probably won't need most of the auth events, so let's just check which
- # we have for now, rather than thrashing the event cache with them all
- # unnecessarily.
-
- # TODO: we probably won't actually need all of the auth events, since we
- # already have a bunch of the state events. It would be nice if the
- # federation api gave us a way of finding out which we actually need.
-
- missing_auth_events = set(auth_event_ids) - fetched_events.keys()
- missing_auth_events.difference_update(
- await self.store.have_seen_events(room_id, missing_auth_events)
- )
- logger.debug("We are also missing %i auth events", len(missing_auth_events))
-
- missing_events = missing_desired_events | missing_auth_events
- logger.debug("Fetching %i events from remote", len(missing_events))
- await self._get_events_and_persist(
- destination=destination, room_id=room_id, events=missing_events
- )
-
- # we need to make sure we re-load from the database to get the rejected
- # state correct.
- fetched_events.update(
- await self.store.get_events(missing_desired_events, allow_rejected=True)
- )
-
- # check for events which were in the wrong room.
- #
- # this can happen if a remote server claims that the state or
- # auth_events at an event in room A are actually events in room B
-
- bad_events = [
- (event_id, event.room_id)
- for event_id, event in fetched_events.items()
- if event.room_id != room_id
- ]
-
- for bad_event_id, bad_room_id in bad_events:
- # This is a bogus situation, but since we may only discover it a long time
- # after it happened, we try our best to carry on, by just omitting the
- # bad events from the returned state set.
- logger.warning(
- "Remote server %s claims event %s in room %s is an auth/state "
- "event in room %s",
- destination,
- bad_event_id,
- bad_room_id,
- room_id,
- )
-
- del fetched_events[bad_event_id]
-
- # if we couldn't get the prev event in question, that's a problem.
- remote_event = fetched_events.get(event_id)
- if not remote_event:
- raise Exception("Unable to get missing prev_event %s" % (event_id,))
-
- # missing state at that event is a warning, not a blocker
- # XXX: this doesn't sound right? it means that we'll end up with incomplete
- # state.
- failed_to_fetch = desired_events - fetched_events.keys()
- if failed_to_fetch:
- logger.warning(
- "Failed to fetch missing state events for %s %s",
- event_id,
- failed_to_fetch,
- )
-
- remote_state = [
- fetched_events[e_id] for e_id in state_event_ids if e_id in fetched_events
- ]
-
- if remote_event.is_state() and remote_event.rejected_reason is None:
- remote_state.append(remote_event)
-
- return remote_state
-
- async def _process_received_pdu(
- self,
- origin: str,
- event: EventBase,
- state: Optional[Iterable[EventBase]],
- ) -> None:
- """Called when we have a new pdu. We need to do auth checks and put it
- through the StateHandler.
-
- Args:
- origin: server sending the event
-
- event: event to be persisted
-
- state: Normally None, but if we are handling a gap in the graph
- (ie, we are missing one or more prev_events), the resolved state at the
- event
- """
- logger.debug("Processing event: %s", event)
-
- try:
- context = await self.state_handler.compute_event_context(
- event, old_state=state
- )
- await self._auth_and_persist_event(origin, event, context, state=state)
- except AuthError as e:
- raise FederationError("ERROR", e.code, e.msg, affected=event.event_id)
-
- # For encrypted messages we check that we know about the sending device,
- # if we don't then we mark the device cache for that user as stale.
- if event.type == EventTypes.Encrypted:
- device_id = event.content.get("device_id")
- sender_key = event.content.get("sender_key")
-
- cached_devices = await self.store.get_cached_devices_for_user(event.sender)
-
- resync = False # Whether we should resync device lists.
-
- device = None
- if device_id is not None:
- device = cached_devices.get(device_id)
- if device is None:
- logger.info(
- "Received event from remote device not in our cache: %s %s",
- event.sender,
- device_id,
- )
- resync = True
-
- # We also check if the `sender_key` matches what we expect.
- if sender_key is not None:
- # Figure out what sender key we're expecting. If we know the
- # device and recognize the algorithm then we can work out the
- # exact key to expect. Otherwise check it matches any key we
- # have for that device.
-
- current_keys: Container[str] = []
-
- if device:
- keys = device.get("keys", {}).get("keys", {})
-
- if (
- event.content.get("algorithm")
- == RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2
- ):
- # For this algorithm we expect a curve25519 key.
- key_name = "curve25519:%s" % (device_id,)
- current_keys = [keys.get(key_name)]
- else:
- # We don't know understand the algorithm, so we just
- # check it matches a key for the device.
- current_keys = keys.values()
- elif device_id:
- # We don't have any keys for the device ID.
- pass
- else:
- # The event didn't include a device ID, so we just look for
- # keys across all devices.
- current_keys = [
- key
- for device in cached_devices.values()
- for key in device.get("keys", {}).get("keys", {}).values()
- ]
-
- # We now check that the sender key matches (one of) the expected
- # keys.
- if sender_key not in current_keys:
- logger.info(
- "Received event from remote device with unexpected sender key: %s %s: %s",
- event.sender,
- device_id or "<no device_id>",
- sender_key,
- )
- resync = True
-
- if resync:
- run_as_background_process(
- "resync_device_due_to_pdu", self._resync_device, event.sender
- )
-
- await self._handle_marker_event(origin, event)
-
- async def _handle_marker_event(self, origin: str, marker_event: EventBase):
- """Handles backfilling the insertion event when we receive a marker
- event that points to one.
-
- Args:
- origin: Origin of the event. Will be called to get the insertion event
- marker_event: The event to process
- """
-
- if marker_event.type != EventTypes.MSC2716_MARKER:
- # Not a marker event
- return
-
- if marker_event.rejected_reason is not None:
- # Rejected event
- return
-
- # Skip processing a marker event if the room version doesn't
- # support it.
- room_version = await self.store.get_room_version(marker_event.room_id)
- if not room_version.msc2716_historical:
- return
-
- logger.debug("_handle_marker_event: received %s", marker_event)
-
- insertion_event_id = marker_event.content.get(
- EventContentFields.MSC2716_MARKER_INSERTION
- )
-
- if insertion_event_id is None:
- # Nothing to retrieve then (invalid marker)
- return
-
- logger.debug(
- "_handle_marker_event: backfilling insertion event %s", insertion_event_id
- )
-
- await self._get_events_and_persist(
- origin,
- marker_event.room_id,
- [insertion_event_id],
- )
-
- insertion_event = await self.store.get_event(
- insertion_event_id, allow_none=True
- )
- if insertion_event is None:
- logger.warning(
- "_handle_marker_event: server %s didn't return insertion event %s for marker %s",
- origin,
- insertion_event_id,
- marker_event.event_id,
- )
- return
-
- logger.debug(
- "_handle_marker_event: succesfully backfilled insertion event %s from marker event %s",
- insertion_event,
- marker_event,
- )
-
- await self.store.insert_insertion_extremity(
- insertion_event_id, marker_event.room_id
- )
-
- logger.debug(
- "_handle_marker_event: insertion extremity added for %s from marker event %s",
- insertion_event,
- marker_event,
- )
-
- async def _resync_device(self, sender: str) -> None:
- """We have detected that the device list for the given user may be out
- of sync, so we try and resync them.
- """
-
- try:
- await self.store.mark_remote_user_device_cache_as_stale(sender)
-
- # Immediately attempt a resync in the background
- if self.config.worker_app:
- await self._user_device_resync(user_id=sender)
- else:
- await self._device_list_updater.user_device_resync(sender)
- except Exception:
- logger.exception("Failed to resync device for %s", sender)
-
- @log_function
- async def backfill(
- self, dest: str, room_id: str, limit: int, extremities: List[str]
- ) -> List[EventBase]:
- """Trigger a backfill request to `dest` for the given `room_id`
-
- This will attempt to get more events from the remote. If the other side
- has no new events to offer, this will return an empty list.
-
- As the events are received, we check their signatures, and also do some
- sanity-checking on them. If any of the backfilled events are invalid,
- this method throws a SynapseError.
-
- TODO: make this more useful to distinguish failures of the remote
- server from invalid events (there is probably no point in trying to
- re-fetch invalid events from every other HS in the room.)
- """
- if dest == self.server_name:
- raise SynapseError(400, "Can't backfill from self.")
-
- events = await self.federation_client.backfill(
- dest, room_id, limit=limit, extremities=extremities
- )
-
- if not events:
- return []
-
- # ideally we'd sanity check the events here for excess prev_events etc,
- # but it's hard to reject events at this point without completely
- # breaking backfill in the same way that it is currently broken by
- # events whose signature we cannot verify (#3121).
- #
- # So for now we accept the events anyway. #3124 tracks this.
- #
- # for ev in events:
- # self._sanity_check_event(ev)
-
- # Don't bother processing events we already have.
- seen_events = await self.store.have_events_in_timeline(
- {e.event_id for e in events}
- )
-
- events = [e for e in events if e.event_id not in seen_events]
-
- if not events:
- return []
-
- event_map = {e.event_id: e for e in events}
-
- event_ids = {e.event_id for e in events}
-
- # build a list of events whose prev_events weren't in the batch.
- # (XXX: this will include events whose prev_events we already have; that doesn't
- # sound right?)
- edges = [ev.event_id for ev in events if set(ev.prev_event_ids()) - event_ids]
-
- logger.info("backfill: Got %d events with %d edges", len(events), len(edges))
-
- # For each edge get the current state.
-
- state_events = {}
- events_to_state = {}
- for e_id in edges:
- state = await self._get_state_for_room(
- destination=dest,
- room_id=room_id,
- event_id=e_id,
- )
- state_events.update({s.event_id: s for s in state})
- events_to_state[e_id] = state
-
- required_auth = {
- a_id
- for event in events + list(state_events.values())
- for a_id in event.auth_event_ids()
- }
- auth_events = await self.store.get_events(required_auth, allow_rejected=True)
- auth_events.update(
- {e_id: event_map[e_id] for e_id in required_auth if e_id in event_map}
- )
-
- ev_infos = []
-
- # Step 1: persist the events in the chunk we fetched state for (i.e.
- # the backwards extremities), with custom auth events and state
- for e_id in events_to_state:
- # For paranoia we ensure that these events are marked as
- # non-outliers
- ev = event_map[e_id]
- assert not ev.internal_metadata.is_outlier()
-
- ev_infos.append(
- _NewEventInfo(
- event=ev,
- state=events_to_state[e_id],
- claimed_auth_event_map={
- (
- auth_events[a_id].type,
- auth_events[a_id].state_key,
- ): auth_events[a_id]
- for a_id in ev.auth_event_ids()
- if a_id in auth_events
- },
- )
- )
-
- if ev_infos:
- await self._auth_and_persist_events(
- dest, room_id, ev_infos, backfilled=True
- )
-
- # Step 2: Persist the rest of the events in the chunk one by one
- events.sort(key=lambda e: e.depth)
-
- for event in events:
- if event in events_to_state:
- continue
-
- # For paranoia we ensure that these events are marked as
- # non-outliers
- assert not event.internal_metadata.is_outlier()
-
- context = await self.state_handler.compute_event_context(event)
-
- # We store these one at a time since each event depends on the
- # previous to work out the state.
- # TODO: We can probably do something more clever here.
- await self._auth_and_persist_event(dest, event, context, backfilled=True)
-
- return events
-
async def maybe_backfill(
self, room_id: str, current_depth: int, limit: int
) -> bool:
@@ -1326,14 +308,14 @@ class FederationHandler(BaseHandler):
# TODO: Should we try multiple of these at a time?
for dom in domains:
try:
- await self.backfill(
+ await self._federation_event_handler.backfill(
dom, room_id, limit=100, extremities=extremities
)
# If this succeeded then we probably already have the
# appropriate stuff.
# TODO: We can probably do something more intelligent here.
return True
- except SynapseError as e:
+ except (SynapseError, InvalidResponseError) as e:
logger.info("Failed to backfill from %s because %s", dom, e)
continue
except HttpResponseException as e:
@@ -1417,115 +399,6 @@ class FederationHandler(BaseHandler):
return False
- async def _get_events_and_persist(
- self, destination: str, room_id: str, events: Iterable[str]
- ) -> None:
- """Fetch the given events from a server, and persist them as outliers.
-
- This function *does not* recursively get missing auth events of the
- newly fetched events. Callers must include in the `events` argument
- any missing events from the auth chain.
-
- Logs a warning if we can't find the given event.
- """
-
- room_version = await self.store.get_room_version(room_id)
-
- event_map: Dict[str, EventBase] = {}
-
- async def get_event(event_id: str):
- with nested_logging_context(event_id):
- try:
- event = await self.federation_client.get_pdu(
- [destination],
- event_id,
- room_version,
- outlier=True,
- )
- if event is None:
- logger.warning(
- "Server %s didn't return event %s",
- destination,
- event_id,
- )
- return
-
- event_map[event.event_id] = event
-
- except Exception as e:
- logger.warning(
- "Error fetching missing state/auth event %s: %s %s",
- event_id,
- type(e),
- e,
- )
-
- await concurrently_execute(get_event, events, 5)
-
- # Make a map of auth events for each event. We do this after fetching
- # all the events as some of the events' auth events will be in the list
- # of requested events.
-
- auth_events = [
- aid
- for event in event_map.values()
- for aid in event.auth_event_ids()
- if aid not in event_map
- ]
- persisted_events = await self.store.get_events(
- auth_events,
- allow_rejected=True,
- )
-
- event_infos = []
- for event in event_map.values():
- auth = {}
- for auth_event_id in event.auth_event_ids():
- ae = persisted_events.get(auth_event_id) or event_map.get(auth_event_id)
- if ae:
- auth[(ae.type, ae.state_key)] = ae
- else:
- logger.info("Missing auth event %s", auth_event_id)
-
- event_infos.append(_NewEventInfo(event, None, auth))
-
- if event_infos:
- await self._auth_and_persist_events(
- destination,
- room_id,
- event_infos,
- )
-
- def _sanity_check_event(self, ev: EventBase) -> None:
- """
- Do some early sanity checks of a received event
-
- In particular, checks it doesn't have an excessive number of
- prev_events or auth_events, which could cause a huge state resolution
- or cascade of event fetches.
-
- Args:
- ev: event to be checked
-
- Raises:
- SynapseError if the event does not pass muster
- """
- if len(ev.prev_event_ids()) > 20:
- logger.warning(
- "Rejecting event %s which has %i prev_events",
- ev.event_id,
- len(ev.prev_event_ids()),
- )
- raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many prev_events")
-
- if len(ev.auth_event_ids()) > 10:
- logger.warning(
- "Rejecting event %s which has %i auth_events",
- ev.event_id,
- len(ev.auth_event_ids()),
- )
- raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
-
async def send_invite(self, target_host: str, event: EventBase) -> EventBase:
"""Sends the invite to the remote server for signing.
@@ -1591,9 +464,9 @@ class FederationHandler(BaseHandler):
# This shouldn't happen, because the RoomMemberHandler has a
# linearizer lock which only allows one operation per user per room
# at a time - so this is just paranoia.
- assert room_id not in self.room_queues
+ assert room_id not in self._federation_event_handler.room_queues
- self.room_queues[room_id] = []
+ self._federation_event_handler.room_queues[room_id] = []
await self._clean_room_for_join(room_id)
@@ -1667,8 +540,8 @@ class FederationHandler(BaseHandler):
logger.debug("Finished joining %s to %s", joinee, room_id)
return event.event_id, max_stream_id
finally:
- room_queue = self.room_queues[room_id]
- del self.room_queues[room_id]
+ room_queue = self._federation_event_handler.room_queues[room_id]
+ del self._federation_event_handler.room_queues[room_id]
# we don't need to wait for the queued events to be processed -
# it's just a best-effort thing at this point. We do want to do
@@ -1744,7 +617,7 @@ class FederationHandler(BaseHandler):
event.unsigned["knock_room_state"] = stripped_room_state["knock_state_events"]
context = await self.state_handler.compute_event_context(event)
- stream_id = await self.persist_events_and_notify(
+ stream_id = await self._federation_event_handler.persist_events_and_notify(
event.room_id, [(event, context)]
)
return event.event_id, stream_id
@@ -1764,7 +637,7 @@ class FederationHandler(BaseHandler):
p,
)
with nested_logging_context(p.event_id):
- await self.on_receive_pdu(origin, p, sent_to_us_directly=True)
+ await self._federation_event_handler.on_receive_pdu(origin, p)
except Exception as e:
logger.warning(
"Error handling queued PDU %s from %s: %s", p.event_id, origin, e
@@ -1857,7 +730,7 @@ class FederationHandler(BaseHandler):
raise
# Ensure the user can even join the room.
- await self._check_join_restrictions(context, event)
+ await self._federation_event_handler.check_join_restrictions(context, event)
# The remote hasn't signed it yet, obviously. We'll do the full checks
# when we get the event back in `on_send_join_request`
@@ -1934,7 +807,9 @@ class FederationHandler(BaseHandler):
)
context = await self.state_handler.compute_event_context(event)
- await self.persist_events_and_notify(event.room_id, [(event, context)])
+ await self._federation_event_handler.persist_events_and_notify(
+ event.room_id, [(event, context)]
+ )
return event
@@ -1961,7 +836,7 @@ class FederationHandler(BaseHandler):
await self.federation_client.send_leave(host_list, event)
context = await self.state_handler.compute_event_context(event)
- stream_id = await self.persist_events_and_notify(
+ stream_id = await self._federation_event_handler.persist_events_and_notify(
event.room_id, [(event, context)]
)
@@ -2104,116 +979,6 @@ class FederationHandler(BaseHandler):
return event
- @log_function
- async def on_send_membership_event(
- self, origin: str, event: EventBase
- ) -> Tuple[EventBase, EventContext]:
- """
- We have received a join/leave/knock event for a room via send_join/leave/knock.
-
- Verify that event and send it into the room on the remote homeserver's behalf.
-
- This is quite similar to on_receive_pdu, with the following principal
- differences:
- * only membership events are permitted (and only events with
- sender==state_key -- ie, no kicks or bans)
- * *We* send out the event on behalf of the remote server.
- * We enforce the membership restrictions of restricted rooms.
- * Rejected events result in an exception rather than being stored.
-
- There are also other differences, however it is not clear if these are by
- design or omission. In particular, we do not attempt to backfill any missing
- prev_events.
-
- Args:
- origin: The homeserver of the remote (joining/invited/knocking) user.
- event: The member event that has been signed by the remote homeserver.
-
- Returns:
- The event and context of the event after inserting it into the room graph.
-
- Raises:
- SynapseError if the event is not accepted into the room
- """
- logger.debug(
- "on_send_membership_event: Got event: %s, signatures: %s",
- event.event_id,
- event.signatures,
- )
-
- if get_domain_from_id(event.sender) != origin:
- logger.info(
- "Got send_membership request for user %r from different origin %s",
- event.sender,
- origin,
- )
- raise SynapseError(403, "User not from origin", Codes.FORBIDDEN)
-
- if event.sender != event.state_key:
- raise SynapseError(400, "state_key and sender must match", Codes.BAD_JSON)
-
- assert not event.internal_metadata.outlier
-
- # Send this event on behalf of the other server.
- #
- # The remote server isn't a full participant in the room at this point, so
- # may not have an up-to-date list of the other homeservers participating in
- # the room, so we send it on their behalf.
- event.internal_metadata.send_on_behalf_of = origin
-
- context = await self.state_handler.compute_event_context(event)
- context = await self._check_event_auth(origin, event, context)
- if context.rejected:
- raise SynapseError(
- 403, f"{event.membership} event was rejected", Codes.FORBIDDEN
- )
-
- # for joins, we need to check the restrictions of restricted rooms
- if event.membership == Membership.JOIN:
- await self._check_join_restrictions(context, event)
-
- # for knock events, we run the third-party event rules. It's not entirely clear
- # why we don't do this for other sorts of membership events.
- if event.membership == Membership.KNOCK:
- event_allowed, _ = await self.third_party_event_rules.check_event_allowed(
- event, context
- )
- if not event_allowed:
- logger.info("Sending of knock %s forbidden by third-party rules", event)
- raise SynapseError(
- 403, "This event is not allowed in this context", Codes.FORBIDDEN
- )
-
- # all looks good, we can persist the event.
- await self._run_push_actions_and_persist_event(event, context)
- return event, context
-
- async def _check_join_restrictions(
- self, context: EventContext, event: EventBase
- ) -> None:
- """Check that restrictions in restricted join rules are matched
-
- Called when we receive a join event via send_join.
-
- Raises an auth error if the restrictions are not matched.
- """
- prev_state_ids = await context.get_prev_state_ids()
-
- # Check if the user is already in the room or invited to the room.
- user_id = event.state_key
- prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
- prev_member_event = None
- if prev_member_event_id:
- prev_member_event = await self.store.get_event(prev_member_event_id)
-
- # Check if the member should be allowed access via membership in a space.
- await self._event_auth_handler.check_restricted_join_rules(
- prev_state_ids,
- event.room_version,
- user_id,
- prev_member_event,
- )
-
async def get_state_for_pdu(self, room_id: str, event_id: str) -> List[EventBase]:
"""Returns the state at the event. i.e. not including said event."""
@@ -2314,131 +1079,6 @@ class FederationHandler(BaseHandler):
else:
return None
- async def get_min_depth_for_context(self, context: str) -> int:
- return await self.store.get_min_depth(context)
-
- async def _auth_and_persist_event(
- self,
- origin: str,
- event: EventBase,
- context: EventContext,
- state: Optional[Iterable[EventBase]] = None,
- claimed_auth_event_map: Optional[StateMap[EventBase]] = None,
- backfilled: bool = False,
- ) -> None:
- """
- Process an event by performing auth checks and then persisting to the database.
-
- Args:
- origin: The host the event originates from.
- event: The event itself.
- context:
- The event context.
-
- state:
- The state events used to check the event for soft-fail. If this is
- not provided the current state events will be used.
-
- claimed_auth_event_map:
- A map of (type, state_key) => event for the event's claimed auth_events.
- Possibly incomplete, and possibly including events that are not yet
- persisted, or authed, or in the right room.
-
- Only populated where we may not already have persisted these events -
- for example, when populating outliers.
-
- backfilled: True if the event was backfilled.
- """
- context = await self._check_event_auth(
- origin,
- event,
- context,
- state=state,
- claimed_auth_event_map=claimed_auth_event_map,
- backfilled=backfilled,
- )
-
- await self._run_push_actions_and_persist_event(event, context, backfilled)
-
- async def _run_push_actions_and_persist_event(
- self, event: EventBase, context: EventContext, backfilled: bool = False
- ):
- """Run the push actions for a received event, and persist it.
-
- Args:
- event: The event itself.
- context: The event context.
- backfilled: True if the event was backfilled.
- """
- try:
- if (
- not event.internal_metadata.is_outlier()
- and not backfilled
- and not context.rejected
- ):
- await self.action_generator.handle_push_actions_for_event(
- event, context
- )
-
- await self.persist_events_and_notify(
- event.room_id, [(event, context)], backfilled=backfilled
- )
- except Exception:
- run_in_background(
- self.store.remove_push_actions_from_staging, event.event_id
- )
- raise
-
- async def _auth_and_persist_events(
- self,
- origin: str,
- room_id: str,
- event_infos: Collection[_NewEventInfo],
- backfilled: bool = False,
- ) -> None:
- """Creates the appropriate contexts and persists events. The events
- should not depend on one another, e.g. this should be used to persist
- a bunch of outliers, but not a chunk of individual events that depend
- on each other for state calculations.
-
- Notifies about the events where appropriate.
- """
-
- if not event_infos:
- return
-
- async def prep(ev_info: _NewEventInfo):
- event = ev_info.event
- with nested_logging_context(suffix=event.event_id):
- res = await self.state_handler.compute_event_context(
- event, old_state=ev_info.state
- )
- res = await self._check_event_auth(
- origin,
- event,
- res,
- state=ev_info.state,
- claimed_auth_event_map=ev_info.claimed_auth_event_map,
- backfilled=backfilled,
- )
- return res
-
- contexts = await make_deferred_yieldable(
- defer.gatherResults(
- [run_in_background(prep, ev_info) for ev_info in event_infos],
- consumeErrors=True,
- )
- )
-
- await self.persist_events_and_notify(
- room_id,
- [
- (ev_info.event, context)
- for ev_info, context in zip(event_infos, contexts)
- ],
- backfilled=backfilled,
- )
-
async def _persist_auth_tree(
self,
origin: str,
@@ -2536,7 +1176,7 @@ class FederationHandler(BaseHandler):
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
if auth_events or state:
- await self.persist_events_and_notify(
+ await self._federation_event_handler.persist_events_and_notify(
room_id,
[
(e, events_to_context[e.event_id])
@@ -2548,108 +1188,10 @@ class FederationHandler(BaseHandler):
event, old_state=state
)
- return await self.persist_events_and_notify(
+ return await self._federation_event_handler.persist_events_and_notify(
room_id, [(event, new_event_context)]
)
- async def _check_for_soft_fail(
- self,
- event: EventBase,
- state: Optional[Iterable[EventBase]],
- backfilled: bool,
- origin: str,
- ) -> None:
- """Checks if we should soft fail the event; if so, marks the event as
- such.
-
- Args:
- event
- state: The state at the event if we don't have all the event's prev events
- backfilled: Whether the event is from backfill
- origin: The host the event originates from.
- """
- # For new (non-backfilled and non-outlier) events we check if the event
- # passes auth based on the current state. If it doesn't then we
- # "soft-fail" the event.
- if backfilled or event.internal_metadata.is_outlier():
- return
-
- extrem_ids_list = await self.store.get_latest_event_ids_in_room(event.room_id)
- extrem_ids = set(extrem_ids_list)
- prev_event_ids = set(event.prev_event_ids())
-
- if extrem_ids == prev_event_ids:
- # If they're the same then the current state is the same as the
- # state at the event, so no point rechecking auth for soft fail.
- return
-
- room_version = await self.store.get_room_version_id(event.room_id)
- room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
-
- # Calculate the "current state".
- if state is not None:
- # If we're explicitly given the state then we won't have all the
- # prev events, and so we have a gap in the graph. In this case
- # we want to be a little careful as we might have been down for
- # a while and have an incorrect view of the current state,
- # however we still want to do checks as gaps are easy to
- # maliciously manufacture.
- #
- # So we use a "current state" that is actually a state
- # resolution across the current forward extremities and the
- # given state at the event. This should correctly handle cases
- # like bans, especially with state res v2.
-
- state_sets_d = await self.state_store.get_state_groups(
- event.room_id, extrem_ids
- )
- state_sets: List[Iterable[EventBase]] = list(state_sets_d.values())
- state_sets.append(state)
- current_states = await self.state_handler.resolve_events(
- room_version, state_sets, event
- )
- current_state_ids: StateMap[str] = {
- k: e.event_id for k, e in current_states.items()
- }
- else:
- current_state_ids = await self.state_handler.get_current_state_ids(
- event.room_id, latest_event_ids=extrem_ids
- )
-
- logger.debug(
- "Doing soft-fail check for %s: state %s",
- event.event_id,
- current_state_ids,
- )
-
- # Now check if event pass auth against said current state
- auth_types = auth_types_for_event(room_version_obj, event)
- current_state_ids_list = [
- e for k, e in current_state_ids.items() if k in auth_types
- ]
-
- auth_events_map = await self.store.get_events(current_state_ids_list)
- current_auth_events = {
- (e.type, e.state_key): e for e in auth_events_map.values()
- }
-
- try:
- event_auth.check(room_version_obj, event, auth_events=current_auth_events)
- except AuthError as e:
- logger.warning(
- "Soft-failing %r (from %s) because %s",
- event,
- e,
- origin,
- extra={
- "room_id": event.room_id,
- "mxid": event.sender,
- "hs": origin,
- },
- )
- soft_failed_event_counter.inc()
- event.internal_metadata.soft_failed = True
-
async def on_get_missing_events(
self,
origin: str,
@@ -2678,334 +1220,6 @@ class FederationHandler(BaseHandler):
return missing_events
- async def _check_event_auth(
- self,
- origin: str,
- event: EventBase,
- context: EventContext,
- state: Optional[Iterable[EventBase]] = None,
- claimed_auth_event_map: Optional[StateMap[EventBase]] = None,
- backfilled: bool = False,
- ) -> EventContext:
- """
- Checks whether an event should be rejected (for failing auth checks).
-
- Args:
- origin: The host the event originates from.
- event: The event itself.
- context:
- The event context.
-
- state:
- The state events used to check the event for soft-fail. If this is
- not provided the current state events will be used.
-
- claimed_auth_event_map:
- A map of (type, state_key) => event for the event's claimed auth_events.
- Possibly incomplete, and possibly including events that are not yet
- persisted, or authed, or in the right room.
-
- Only populated where we may not already have persisted these events -
- for example, when populating outliers, or the state for a backwards
- extremity.
-
- backfilled: True if the event was backfilled.
-
- Returns:
- The updated context object.
- """
- room_version = await self.store.get_room_version_id(event.room_id)
- room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
-
- if claimed_auth_event_map:
- # if we have a copy of the auth events from the event, use that as the
- # basis for auth.
- auth_events = claimed_auth_event_map
- else:
- # otherwise, we calculate what the auth events *should* be, and use that
- prev_state_ids = await context.get_prev_state_ids()
- auth_events_ids = self._event_auth_handler.compute_auth_events(
- event, prev_state_ids, for_verification=True
- )
- auth_events_x = await self.store.get_events(auth_events_ids)
- auth_events = {(e.type, e.state_key): e for e in auth_events_x.values()}
-
- try:
- (
- context,
- auth_events_for_auth,
- ) = await self._update_auth_events_and_context_for_auth(
- origin, event, context, auth_events
- )
- except Exception:
- # We don't really mind if the above fails, so lets not fail
- # processing if it does. However, it really shouldn't fail so
- # let's still log as an exception since we'll still want to fix
- # any bugs.
- logger.exception(
- "Failed to double check auth events for %s with remote. "
- "Ignoring failure and continuing processing of event.",
- event.event_id,
- )
- auth_events_for_auth = auth_events
-
- try:
- event_auth.check(room_version_obj, event, auth_events=auth_events_for_auth)
- except AuthError as e:
- logger.warning("Failed auth resolution for %r because %s", event, e)
- context.rejected = RejectedReason.AUTH_ERROR
-
- if not context.rejected:
- await self._check_for_soft_fail(event, state, backfilled, origin=origin)
-
- if event.type == EventTypes.GuestAccess and not context.rejected:
- await self.maybe_kick_guest_users(event)
-
- # If we are going to send this event over federation we precaclculate
- # the joined hosts.
- if event.internal_metadata.get_send_on_behalf_of():
- await self.event_creation_handler.cache_joined_hosts_for_event(
- event, context
- )
-
- return context
-
- async def _update_auth_events_and_context_for_auth(
- self,
- origin: str,
- event: EventBase,
- context: EventContext,
- input_auth_events: StateMap[EventBase],
- ) -> Tuple[EventContext, StateMap[EventBase]]:
- """Helper for _check_event_auth. See there for docs.
-
- Checks whether a given event has the expected auth events. If it
- doesn't then we talk to the remote server to compare state to see if
- we can come to a consensus (e.g. if one server missed some valid
- state).
-
- This attempts to resolve any potential divergence of state between
- servers, but is not essential and so failures should not block further
- processing of the event.
-
- Args:
- origin:
- event:
- context:
-
- input_auth_events:
- Map from (event_type, state_key) to event
-
- Normally, our calculated auth_events based on the state of the room
- at the event's position in the DAG, though occasionally (eg if the
- event is an outlier), may be the auth events claimed by the remote
- server.
-
- Returns:
- updated context, updated auth event map
- """
- # take a copy of input_auth_events before we modify it.
- auth_events: MutableStateMap[EventBase] = dict(input_auth_events)
-
- event_auth_events = set(event.auth_event_ids())
-
- # missing_auth is the set of the event's auth_events which we don't yet have
- # in auth_events.
- missing_auth = event_auth_events.difference(
- e.event_id for e in auth_events.values()
- )
-
- # if we have missing events, we need to fetch those events from somewhere.
- #
- # we start by checking if they are in the store, and then try calling /event_auth/.
- if missing_auth:
- have_events = await self.store.have_seen_events(event.room_id, missing_auth)
- logger.debug("Events %s are in the store", have_events)
- missing_auth.difference_update(have_events)
-
- if missing_auth:
- # If we don't have all the auth events, we need to get them.
- logger.info("auth_events contains unknown events: %s", missing_auth)
- try:
- try:
- remote_auth_chain = await self.federation_client.get_event_auth(
- origin, event.room_id, event.event_id
- )
- except RequestSendFailed as e1:
- # The other side isn't around or doesn't implement the
- # endpoint, so lets just bail out.
- logger.info("Failed to get event auth from remote: %s", e1)
- return context, auth_events
-
- seen_remotes = await self.store.have_seen_events(
- event.room_id, [e.event_id for e in remote_auth_chain]
- )
-
- for e in remote_auth_chain:
- if e.event_id in seen_remotes:
- continue
-
- if e.event_id == event.event_id:
- continue
-
- try:
- auth_ids = e.auth_event_ids()
- auth = {
- (e.type, e.state_key): e
- for e in remote_auth_chain
- if e.event_id in auth_ids or e.type == EventTypes.Create
- }
- e.internal_metadata.outlier = True
-
- logger.debug(
- "_check_event_auth %s missing_auth: %s",
- event.event_id,
- e.event_id,
- )
- missing_auth_event_context = (
- await self.state_handler.compute_event_context(e)
- )
- await self._auth_and_persist_event(
- origin,
- e,
- missing_auth_event_context,
- claimed_auth_event_map=auth,
- )
-
- if e.event_id in event_auth_events:
- auth_events[(e.type, e.state_key)] = e
- except AuthError:
- pass
-
- except Exception:
- logger.exception("Failed to get auth chain")
-
- if event.internal_metadata.is_outlier():
- # XXX: given that, for an outlier, we'll be working with the
- # event's *claimed* auth events rather than those we calculated:
- # (a) is there any point in this test, since different_auth below will
- # obviously be empty
- # (b) alternatively, why don't we do it earlier?
- logger.info("Skipping auth_event fetch for outlier")
- return context, auth_events
-
- different_auth = event_auth_events.difference(
- e.event_id for e in auth_events.values()
- )
-
- if not different_auth:
- return context, auth_events
-
- logger.info(
- "auth_events refers to events which are not in our calculated auth "
- "chain: %s",
- different_auth,
- )
-
- # XXX: currently this checks for redactions but I'm not convinced that is
- # necessary?
- different_events = await self.store.get_events_as_list(different_auth)
-
- for d in different_events:
- if d.room_id != event.room_id:
- logger.warning(
- "Event %s refers to auth_event %s which is in a different room",
- event.event_id,
- d.event_id,
- )
-
- # don't attempt to resolve the claimed auth events against our own
- # in this case: just use our own auth events.
- #
- # XXX: should we reject the event in this case? It feels like we should,
- # but then shouldn't we also do so if we've failed to fetch any of the
- # auth events?
- return context, auth_events
-
- # now we state-resolve between our own idea of the auth events, and the remote's
- # idea of them.
-
- local_state = auth_events.values()
- remote_auth_events = dict(auth_events)
- remote_auth_events.update({(d.type, d.state_key): d for d in different_events})
- remote_state = remote_auth_events.values()
-
- room_version = await self.store.get_room_version_id(event.room_id)
- new_state = await self.state_handler.resolve_events(
- room_version, (local_state, remote_state), event
- )
-
- logger.info(
- "After state res: updating auth_events with new state %s",
- {
- (d.type, d.state_key): d.event_id
- for d in new_state.values()
- if auth_events.get((d.type, d.state_key)) != d
- },
- )
-
- auth_events.update(new_state)
-
- context = await self._update_context_for_auth_events(
- event, context, auth_events
- )
-
- return context, auth_events
-
- async def _update_context_for_auth_events(
- self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase]
- ) -> EventContext:
- """Update the state_ids in an event context after auth event resolution,
- storing the changes as a new state group.
-
- Args:
- event: The event we're handling the context for
-
- context: initial event context
-
- auth_events: Events to update in the event context.
-
- Returns:
- new event context
- """
- # exclude the state key of the new event from the current_state in the context.
- if event.is_state():
- event_key: Optional[Tuple[str, str]] = (event.type, event.state_key)
- else:
- event_key = None
- state_updates = {
- k: a.event_id for k, a in auth_events.items() if k != event_key
- }
-
- current_state_ids = await context.get_current_state_ids()
- current_state_ids = dict(current_state_ids) # type: ignore
-
- current_state_ids.update(state_updates)
-
- prev_state_ids = await context.get_prev_state_ids()
- prev_state_ids = dict(prev_state_ids)
-
- prev_state_ids.update({k: a.event_id for k, a in auth_events.items()})
-
- # create a new state group as a delta from the existing one.
- prev_group = context.state_group
- state_group = await self.state_store.store_state_group(
- event.event_id,
- event.room_id,
- prev_group=prev_group,
- delta_ids=state_updates,
- current_state_ids=current_state_ids,
- )
-
- return EventContext.with_state(
- state_group=state_group,
- state_group_before_event=context.state_group_before_event,
- current_state_ids=current_state_ids,
- prev_state_ids=prev_state_ids,
- prev_group=prev_group,
- delta_ids=state_updates,
- )
-
async def construct_auth_difference(
self, local_auth: Iterable[EventBase], remote_auth: Iterable[EventBase]
) -> Dict:
@@ -3392,99 +1606,6 @@ class FederationHandler(BaseHandler):
if "valid" not in response or not response["valid"]:
raise AuthError(403, "Third party certificate was invalid")
- async def persist_events_and_notify(
- self,
- room_id: str,
- event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
- backfilled: bool = False,
- ) -> int:
- """Persists events and tells the notifier/pushers about them, if
- necessary.
-
- Args:
- room_id: The room ID of events being persisted.
- event_and_contexts: Sequence of events with their associated
- context that should be persisted. All events must belong to
- the same room.
- backfilled: Whether these events are a result of
- backfilling or not
-
- Returns:
- The stream ID after which all events have been persisted.
- """
- if not event_and_contexts:
- return self.store.get_current_events_token()
-
- instance = self.config.worker.events_shard_config.get_instance(room_id)
- if instance != self._instance_name:
- # Limit the number of events sent over replication. We choose 200
- # here as that is what we default to in `max_request_body_size(..)`
- for batch in batch_iter(event_and_contexts, 200):
- result = await self._send_events(
- instance_name=instance,
- store=self.store,
- room_id=room_id,
- event_and_contexts=batch,
- backfilled=backfilled,
- )
- return result["max_stream_id"]
- else:
- assert self.storage.persistence
-
- # Note that this returns the events that were persisted, which may not be
- # the same as were passed in if some were deduplicated due to transaction IDs.
- events, max_stream_token = await self.storage.persistence.persist_events(
- event_and_contexts, backfilled=backfilled
- )
-
- if self._ephemeral_messages_enabled:
- for event in events:
- # If there's an expiry timestamp on the event, schedule its expiry.
- self._message_handler.maybe_schedule_expiry(event)
-
- if not backfilled: # Never notify for backfilled events
- for event in events:
- await self._notify_persisted_event(event, max_stream_token)
-
- return max_stream_token.stream
-
- async def _notify_persisted_event(
- self, event: EventBase, max_stream_token: RoomStreamToken
- ) -> None:
- """Checks to see if notifier/pushers should be notified about the
- event or not.
-
- Args:
- event:
- max_stream_id: The max_stream_id returned by persist_events
- """
-
- extra_users = []
- if event.type == EventTypes.Member:
- target_user_id = event.state_key
-
- # We notify for memberships if its an invite for one of our
- # users
- if event.internal_metadata.is_outlier():
- if event.membership != Membership.INVITE:
- if not self.is_mine_id(target_user_id):
- return
-
- target_user = UserID.from_string(target_user_id)
- extra_users.append(target_user)
- elif event.internal_metadata.is_outlier():
- return
-
- # the event has been persisted so it should have a stream ordering.
- assert event.internal_metadata.stream_ordering
-
- event_pos = PersistedEventPosition(
- self._instance_name, event.internal_metadata.stream_ordering
- )
- self.notifier.on_new_room_event(
- event, event_pos, max_stream_token, extra_users=extra_users
- )
-
async def _clean_room_for_join(self, room_id: str) -> None:
"""Called to clean up any data in DB for a given room, ready for the
server to join the room.
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
new file mode 100644
index 0000000000..9f055f00cf
--- /dev/null
+++ b/synapse/handlers/federation_event.py
@@ -0,0 +1,1825 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from http import HTTPStatus
+from typing import (
+ TYPE_CHECKING,
+ Collection,
+ Container,
+ Dict,
+ Iterable,
+ List,
+ Optional,
+ Sequence,
+ Set,
+ Tuple,
+)
+
+import attr
+from prometheus_client import Counter
+
+from twisted.internet import defer
+
+from synapse import event_auth
+from synapse.api.constants import (
+ EventContentFields,
+ EventTypes,
+ Membership,
+ RejectedReason,
+ RoomEncryptionAlgorithms,
+)
+from synapse.api.errors import (
+ AuthError,
+ Codes,
+ FederationError,
+ HttpResponseException,
+ RequestSendFailed,
+ SynapseError,
+)
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
+from synapse.event_auth import auth_types_for_event
+from synapse.events import EventBase
+from synapse.events.snapshot import EventContext
+from synapse.federation.federation_client import InvalidResponseError
+from synapse.handlers._base import BaseHandler
+from synapse.logging.context import (
+ make_deferred_yieldable,
+ nested_logging_context,
+ run_in_background,
+)
+from synapse.logging.utils import log_function
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
+from synapse.replication.http.federation import (
+ ReplicationFederationSendEventsRestServlet,
+)
+from synapse.state import StateResolutionStore
+from synapse.storage.databases.main.events_worker import EventRedactBehaviour
+from synapse.types import (
+ MutableStateMap,
+ PersistedEventPosition,
+ RoomStreamToken,
+ StateMap,
+ UserID,
+ get_domain_from_id,
+)
+from synapse.util.async_helpers import Linearizer, concurrently_execute
+from synapse.util.iterutils import batch_iter
+from synapse.util.retryutils import NotRetryingDestination
+from synapse.util.stringutils import shortstr
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+
+logger = logging.getLogger(__name__)
+
+soft_failed_event_counter = Counter(
+ "synapse_federation_soft_failed_events_total",
+ "Events received over federation that we marked as soft_failed",
+)
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class _NewEventInfo:
+ """Holds information about a received event, ready for passing to _auth_and_persist_events
+
+ Attributes:
+ event: the received event
+
+ claimed_auth_event_map: a map of (type, state_key) => event for the event's
+ claimed auth_events.
+
+ This can include events which have not yet been persisted, in the case that
+ we are backfilling a batch of events.
+
+ Note: May be incomplete: if we were unable to find all of the claimed auth
+ events. Also, treat the contents with caution: the events might also have
+ been rejected, might not yet have been authorized themselves, or they might
+ be in the wrong room.
+
+ """
+
+ event: EventBase
+ claimed_auth_event_map: StateMap[EventBase]
+
+
+class FederationEventHandler(BaseHandler):
+ """Handles events that originated from federation.
+
+ Responsible for handing incoming events and passing them on to the rest
+ of the homeserver (including auth and state conflict resolutions)
+ """
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__(hs)
+
+ self.store = hs.get_datastore()
+ self.storage = hs.get_storage()
+ self.state_store = self.storage.state
+
+ self.state_handler = hs.get_state_handler()
+ self.event_creation_handler = hs.get_event_creation_handler()
+ self._event_auth_handler = hs.get_event_auth_handler()
+ self._message_handler = hs.get_message_handler()
+ self.action_generator = hs.get_action_generator()
+ self._state_resolution_handler = hs.get_state_resolution_handler()
+
+ self.federation_client = hs.get_federation_client()
+ self.third_party_event_rules = hs.get_third_party_event_rules()
+
+ self.is_mine_id = hs.is_mine_id
+ self._instance_name = hs.get_instance_name()
+
+ self.config = hs.config
+ self._ephemeral_messages_enabled = hs.config.server.enable_ephemeral_messages
+
+ self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
+ if hs.config.worker_app:
+ self._user_device_resync = (
+ ReplicationUserDevicesResyncRestServlet.make_client(hs)
+ )
+ else:
+ self._device_list_updater = hs.get_device_handler().device_list_updater
+
+ # When joining a room we need to queue any events for that room up.
+ # For each room, a list of (pdu, origin) tuples.
+ # TODO: replace this with something more elegant, probably based around the
+ # federation event staging area.
+ self.room_queues: Dict[str, List[Tuple[EventBase, str]]] = {}
+
+ self._room_pdu_linearizer = Linearizer("fed_room_pdu")
+
+ async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None:
+ """Process a PDU received via a federation /send/ transaction
+
+ Args:
+ origin: server which initiated the /send/ transaction. Will
+ be used to fetch missing events or state.
+ pdu: received PDU
+ """
+
+ room_id = pdu.room_id
+ event_id = pdu.event_id
+
+ # We reprocess pdus when we have seen them only as outliers
+ existing = await self.store.get_event(
+ event_id, allow_none=True, allow_rejected=True
+ )
+
+ # FIXME: Currently we fetch an event again when we already have it
+ # if it has been marked as an outlier.
+ if existing:
+ if not existing.internal_metadata.is_outlier():
+ logger.info(
+ "Ignoring received event %s which we have already seen", event_id
+ )
+ return
+ if pdu.internal_metadata.is_outlier():
+ logger.info(
+ "Ignoring received outlier %s which we already have as an outlier",
+ event_id,
+ )
+ return
+ logger.info("De-outliering event %s", event_id)
+
+ # do some initial sanity-checking of the event. In particular, make
+ # sure it doesn't have hundreds of prev_events or auth_events, which
+ # could cause a huge state resolution or cascade of event fetches.
+ try:
+ self._sanity_check_event(pdu)
+ except SynapseError as err:
+ logger.warning("Received event failed sanity checks")
+ raise FederationError("ERROR", err.code, err.msg, affected=pdu.event_id)
+
+ # If we are currently in the process of joining this room, then we
+ # queue up events for later processing.
+ if room_id in self.room_queues:
+ logger.info(
+ "Queuing PDU from %s for now: join in progress",
+ origin,
+ )
+ self.room_queues[room_id].append((pdu, origin))
+ return
+
+ # If we're not in the room just ditch the event entirely. This is
+ # probably an old server that has come back and thinks we're still in
+ # the room (or we've been rejoined to the room by a state reset).
+ #
+ # Note that if we were never in the room then we would have already
+ # dropped the event, since we wouldn't know the room version.
+ is_in_room = await self._event_auth_handler.check_host_in_room(
+ room_id, self.server_name
+ )
+ if not is_in_room:
+ logger.info(
+ "Ignoring PDU from %s as we're not in the room",
+ origin,
+ )
+ return None
+
+ # Check that the event passes auth based on the state at the event. This is
+ # done for events that are to be added to the timeline (non-outliers).
+ #
+ # Get missing pdus if necessary:
+ # - Fetching any missing prev events to fill in gaps in the graph
+ # - Fetching state if we have a hole in the graph
+ if not pdu.internal_metadata.is_outlier():
+ prevs = set(pdu.prev_event_ids())
+ seen = await self.store.have_events_in_timeline(prevs)
+ missing_prevs = prevs - seen
+
+ if missing_prevs:
+ # We only backfill backwards to the min depth.
+ min_depth = await self.get_min_depth_for_context(pdu.room_id)
+ logger.debug("min_depth: %d", min_depth)
+
+ if min_depth is not None and pdu.depth > min_depth:
+ # If we're missing stuff, ensure we only fetch stuff one
+ # at a time.
+ logger.info(
+ "Acquiring room lock to fetch %d missing prev_events: %s",
+ len(missing_prevs),
+ shortstr(missing_prevs),
+ )
+ with (await self._room_pdu_linearizer.queue(pdu.room_id)):
+ logger.info(
+ "Acquired room lock to fetch %d missing prev_events",
+ len(missing_prevs),
+ )
+
+ try:
+ await self._get_missing_events_for_pdu(
+ origin, pdu, prevs, min_depth
+ )
+ except Exception as e:
+ raise Exception(
+ "Error fetching missing prev_events for %s: %s"
+ % (event_id, e)
+ ) from e
+
+ # Update the set of things we've seen after trying to
+ # fetch the missing stuff
+ seen = await self.store.have_events_in_timeline(prevs)
+ missing_prevs = prevs - seen
+
+ if not missing_prevs:
+ logger.info("Found all missing prev_events")
+
+ if missing_prevs:
+ # since this event was pushed to us, it is possible for it to
+ # become the only forward-extremity in the room, and we would then
+ # trust its state to be the state for the whole room. This is very
+ # bad. Further, if the event was pushed to us, there is no excuse
+ # for us not to have all the prev_events. (XXX: apart from
+ # min_depth?)
+ #
+ # We therefore reject any such events.
+ logger.warning(
+ "Rejecting: failed to fetch %d prev events: %s",
+ len(missing_prevs),
+ shortstr(missing_prevs),
+ )
+ raise FederationError(
+ "ERROR",
+ 403,
+ (
+ "Your server isn't divulging details about prev_events "
+ "referenced in this event."
+ ),
+ affected=pdu.event_id,
+ )
+
+ await self._process_received_pdu(origin, pdu, state=None)
+
+ @log_function
+ async def on_send_membership_event(
+ self, origin: str, event: EventBase
+ ) -> Tuple[EventBase, EventContext]:
+ """
+ We have received a join/leave/knock event for a room via send_join/leave/knock.
+
+ Verify that event and send it into the room on the remote homeserver's behalf.
+
+ This is quite similar to on_receive_pdu, with the following principal
+ differences:
+ * only membership events are permitted (and only events with
+ sender==state_key -- ie, no kicks or bans)
+ * *We* send out the event on behalf of the remote server.
+ * We enforce the membership restrictions of restricted rooms.
+ * Rejected events result in an exception rather than being stored.
+
+ There are also other differences, however it is not clear if these are by
+ design or omission. In particular, we do not attempt to backfill any missing
+ prev_events.
+
+ Args:
+ origin: The homeserver of the remote (joining/invited/knocking) user.
+ event: The member event that has been signed by the remote homeserver.
+
+ Returns:
+ The event and context of the event after inserting it into the room graph.
+
+ Raises:
+ SynapseError if the event is not accepted into the room
+ """
+ logger.debug(
+ "on_send_membership_event: Got event: %s, signatures: %s",
+ event.event_id,
+ event.signatures,
+ )
+
+ if get_domain_from_id(event.sender) != origin:
+ logger.info(
+ "Got send_membership request for user %r from different origin %s",
+ event.sender,
+ origin,
+ )
+ raise SynapseError(403, "User not from origin", Codes.FORBIDDEN)
+
+ if event.sender != event.state_key:
+ raise SynapseError(400, "state_key and sender must match", Codes.BAD_JSON)
+
+ assert not event.internal_metadata.outlier
+
+ # Send this event on behalf of the other server.
+ #
+ # The remote server isn't a full participant in the room at this point, so
+ # may not have an up-to-date list of the other homeservers participating in
+ # the room, so we send it on their behalf.
+ event.internal_metadata.send_on_behalf_of = origin
+
+ context = await self.state_handler.compute_event_context(event)
+ context = await self._check_event_auth(origin, event, context)
+ if context.rejected:
+ raise SynapseError(
+ 403, f"{event.membership} event was rejected", Codes.FORBIDDEN
+ )
+
+ # for joins, we need to check the restrictions of restricted rooms
+ if event.membership == Membership.JOIN:
+ await self.check_join_restrictions(context, event)
+
+ # for knock events, we run the third-party event rules. It's not entirely clear
+ # why we don't do this for other sorts of membership events.
+ if event.membership == Membership.KNOCK:
+ event_allowed, _ = await self.third_party_event_rules.check_event_allowed(
+ event, context
+ )
+ if not event_allowed:
+ logger.info("Sending of knock %s forbidden by third-party rules", event)
+ raise SynapseError(
+ 403, "This event is not allowed in this context", Codes.FORBIDDEN
+ )
+
+ # all looks good, we can persist the event.
+ await self._run_push_actions_and_persist_event(event, context)
+ return event, context
+
+ async def check_join_restrictions(
+ self, context: EventContext, event: EventBase
+ ) -> None:
+ """Check that restrictions in restricted join rules are matched
+
+ Called when we receive a join event via send_join.
+
+ Raises an auth error if the restrictions are not matched.
+ """
+ prev_state_ids = await context.get_prev_state_ids()
+
+ # Check if the user is already in the room or invited to the room.
+ user_id = event.state_key
+ prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
+ prev_member_event = None
+ if prev_member_event_id:
+ prev_member_event = await self.store.get_event(prev_member_event_id)
+
+ # Check if the member should be allowed access via membership in a space.
+ await self._event_auth_handler.check_restricted_join_rules(
+ prev_state_ids,
+ event.room_version,
+ user_id,
+ prev_member_event,
+ )
+
+ @log_function
+ async def backfill(
+ self, dest: str, room_id: str, limit: int, extremities: List[str]
+ ) -> None:
+ """Trigger a backfill request to `dest` for the given `room_id`
+
+ This will attempt to get more events from the remote. If the other side
+ has no new events to offer, this will return an empty list.
+
+ As the events are received, we check their signatures, and also do some
+ sanity-checking on them. If any of the backfilled events are invalid,
+ this method throws a SynapseError.
+
+ We might also raise an InvalidResponseError if the response from the remote
+ server is just bogus.
+
+ TODO: make this more useful to distinguish failures of the remote
+ server from invalid events (there is probably no point in trying to
+ re-fetch invalid events from every other HS in the room.)
+ """
+ if dest == self.server_name:
+ raise SynapseError(400, "Can't backfill from self.")
+
+ events = await self.federation_client.backfill(
+ dest, room_id, limit=limit, extremities=extremities
+ )
+
+ if not events:
+ return
+
+ # if there are any events in the wrong room, the remote server is buggy and
+ # should not be trusted.
+ for ev in events:
+ if ev.room_id != room_id:
+ raise InvalidResponseError(
+ f"Remote server {dest} returned event {ev.event_id} which is in "
+ f"room {ev.room_id}, when we were backfilling in {room_id}"
+ )
+
+ await self._process_pulled_events(dest, events, backfilled=True)
+
+ async def _get_missing_events_for_pdu(
+ self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
+ ) -> None:
+ """
+ Args:
+ origin: Origin of the pdu. Will be called to get the missing events
+ pdu: received pdu
+ prevs: List of event ids which we are missing
+ min_depth: Minimum depth of events to return.
+ """
+
+ room_id = pdu.room_id
+ event_id = pdu.event_id
+
+ seen = await self.store.have_events_in_timeline(prevs)
+
+ if not prevs - seen:
+ return
+
+ latest_list = await self.store.get_latest_event_ids_in_room(room_id)
+
+ # We add the prev events that we have seen to the latest
+ # list to ensure the remote server doesn't give them to us
+ latest = set(latest_list)
+ latest |= seen
+
+ logger.info(
+ "Requesting missing events between %s and %s",
+ shortstr(latest),
+ event_id,
+ )
+
+ # XXX: we set timeout to 10s to help workaround
+ # https://github.com/matrix-org/synapse/issues/1733.
+ # The reason is to avoid holding the linearizer lock
+ # whilst processing inbound /send transactions, causing
+ # FDs to stack up and block other inbound transactions
+ # which empirically can currently take up to 30 minutes.
+ #
+ # N.B. this explicitly disables retry attempts.
+ #
+ # N.B. this also increases our chances of falling back to
+ # fetching fresh state for the room if the missing event
+ # can't be found, which slightly reduces our security.
+ # it may also increase our DAG extremity count for the room,
+ # causing additional state resolution? See #1760.
+ # However, fetching state doesn't hold the linearizer lock
+ # apparently.
+ #
+ # see https://github.com/matrix-org/synapse/pull/1744
+ #
+ # ----
+ #
+ # Update richvdh 2018/09/18: There are a number of problems with timing this
+ # request out aggressively on the client side:
+ #
+ # - it plays badly with the server-side rate-limiter, which starts tarpitting you
+ # if you send too many requests at once, so you end up with the server carefully
+ # working through the backlog of your requests, which you have already timed
+ # out.
+ #
+ # - for this request in particular, we now (as of
+ # https://github.com/matrix-org/synapse/pull/3456) reject any PDUs where the
+ # server can't produce a plausible-looking set of prev_events - so we becone
+ # much more likely to reject the event.
+ #
+ # - contrary to what it says above, we do *not* fall back to fetching fresh state
+ # for the room if get_missing_events times out. Rather, we give up processing
+ # the PDU whose prevs we are missing, which then makes it much more likely that
+ # we'll end up back here for the *next* PDU in the list, which exacerbates the
+ # problem.
+ #
+ # - the aggressive 10s timeout was introduced to deal with incoming federation
+ # requests taking 8 hours to process. It's not entirely clear why that was going
+ # on; certainly there were other issues causing traffic storms which are now
+ # resolved, and I think in any case we may be more sensible about our locking
+ # now. We're *certainly* more sensible about our logging.
+ #
+ # All that said: Let's try increasing the timeout to 60s and see what happens.
+
+ try:
+ missing_events = await self.federation_client.get_missing_events(
+ origin,
+ room_id,
+ earliest_events_ids=list(latest),
+ latest_events=[pdu],
+ limit=10,
+ min_depth=min_depth,
+ timeout=60000,
+ )
+ except (RequestSendFailed, HttpResponseException, NotRetryingDestination) as e:
+ # We failed to get the missing events, but since we need to handle
+ # the case of `get_missing_events` not returning the necessary
+ # events anyway, it is safe to simply log the error and continue.
+ logger.warning("Failed to get prev_events: %s", e)
+ return
+
+ logger.info("Got %d prev_events", len(missing_events))
+ await self._process_pulled_events(origin, missing_events, backfilled=False)
+
+ async def _process_pulled_events(
+ self, origin: str, events: Iterable[EventBase], backfilled: bool
+ ) -> None:
+ """Process a batch of events we have pulled from a remote server
+
+ Pulls in any events required to auth the events, persists the received events,
+ and notifies clients, if appropriate.
+
+ Assumes the events have already had their signatures and hashes checked.
+
+ Params:
+ origin: The server we received these events from
+ events: The received events.
+ backfilled: True if this is part of a historical batch of events (inhibits
+ notification to clients, and validation of device keys.)
+ """
+
+ # We want to sort these by depth so we process them and
+ # tell clients about them in order.
+ sorted_events = sorted(events, key=lambda x: x.depth)
+
+ for ev in sorted_events:
+ with nested_logging_context(ev.event_id):
+ await self._process_pulled_event(origin, ev, backfilled=backfilled)
+
+ async def _process_pulled_event(
+ self, origin: str, event: EventBase, backfilled: bool
+ ) -> None:
+ """Process a single event that we have pulled from a remote server
+
+ Pulls in any events required to auth the event, persists the received event,
+ and notifies clients, if appropriate.
+
+ Assumes the event has already had its signatures and hashes checked.
+
+ This is somewhat equivalent to on_receive_pdu, but applies somewhat different
+ logic in the case that we are missing prev_events (in particular, it just
+ requests the state at that point, rather than triggering a get_missing_events) -
+ so is appropriate when we have pulled the event from a remote server, rather
+ than having it pushed to us.
+
+ Params:
+ origin: The server we received this event from
+ events: The received event
+ backfilled: True if this is part of a historical batch of events (inhibits
+ notification to clients, and validation of device keys.)
+ """
+ logger.info("Processing pulled event %s", event)
+
+ # these should not be outliers.
+ assert not event.internal_metadata.is_outlier()
+
+ event_id = event.event_id
+
+ existing = await self.store.get_event(
+ event_id, allow_none=True, allow_rejected=True
+ )
+ if existing:
+ if not existing.internal_metadata.is_outlier():
+ logger.info(
+ "Ignoring received event %s which we have already seen",
+ event_id,
+ )
+ return
+ logger.info("De-outliering event %s", event_id)
+
+ try:
+ self._sanity_check_event(event)
+ except SynapseError as err:
+ logger.warning("Event %s failed sanity check: %s", event_id, err)
+ return
+
+ try:
+ state = await self._resolve_state_at_missing_prevs(origin, event)
+ await self._process_received_pdu(
+ origin, event, state=state, backfilled=backfilled
+ )
+ except FederationError as e:
+ if e.code == 403:
+ logger.warning("Pulled event %s failed history check.", event_id)
+ else:
+ raise
+
+ async def _resolve_state_at_missing_prevs(
+ self, dest: str, event: EventBase
+ ) -> Optional[Iterable[EventBase]]:
+ """Calculate the state at an event with missing prev_events.
+
+ This is used when we have pulled a batch of events from a remote server, and
+ still don't have all the prev_events.
+
+ If we already have all the prev_events for `event`, this method does nothing.
+
+ Otherwise, the missing prevs become new backwards extremities, and we fall back
+ to asking the remote server for the state after each missing `prev_event`,
+ and resolving across them.
+
+ That's ok provided we then resolve the state against other bits of the DAG
+ before using it - in other words, that the received event `event` is not going
+ to become the only forwards_extremity in the room (which will ensure that you
+ can't just take over a room by sending an event, withholding its prev_events,
+ and declaring yourself to be an admin in the subsequent state request).
+
+ In other words: we should only call this method if `event` has been *pulled*
+ as part of a batch of missing prev events, or similar.
+
+ Params:
+ dest: the remote server to ask for state at the missing prevs. Typically,
+ this will be the server we got `event` from.
+ event: an event to check for missing prevs.
+
+ Returns:
+ if we already had all the prev events, `None`. Otherwise, returns a list of
+ the events in the state at `event`.
+ """
+ room_id = event.room_id
+ event_id = event.event_id
+
+ prevs = set(event.prev_event_ids())
+ seen = await self.store.have_events_in_timeline(prevs)
+ missing_prevs = prevs - seen
+
+ if not missing_prevs:
+ return None
+
+ logger.info(
+ "Event %s is missing prev_events %s: calculating state for a "
+ "backwards extremity",
+ event_id,
+ shortstr(missing_prevs),
+ )
+ # Calculate the state after each of the previous events, and
+ # resolve them to find the correct state at the current event.
+ event_map = {event_id: event}
+ try:
+ # Get the state of the events we know about
+ ours = await self.state_store.get_state_groups_ids(room_id, seen)
+
+ # state_maps is a list of mappings from (type, state_key) to event_id
+ state_maps: List[StateMap[str]] = list(ours.values())
+
+ # we don't need this any more, let's delete it.
+ del ours
+
+ # Ask the remote server for the states we don't
+ # know about
+ for p in missing_prevs:
+ logger.info("Requesting state after missing prev_event %s", p)
+
+ with nested_logging_context(p):
+ # note that if any of the missing prevs share missing state or
+ # auth events, the requests to fetch those events are deduped
+ # by the get_pdu_cache in federation_client.
+ remote_state = await self._get_state_after_missing_prev_event(
+ dest, room_id, p
+ )
+
+ remote_state_map = {
+ (x.type, x.state_key): x.event_id for x in remote_state
+ }
+ state_maps.append(remote_state_map)
+
+ for x in remote_state:
+ event_map[x.event_id] = x
+
+ room_version = await self.store.get_room_version_id(room_id)
+ state_map = await self._state_resolution_handler.resolve_events_with_store(
+ room_id,
+ room_version,
+ state_maps,
+ event_map,
+ state_res_store=StateResolutionStore(self.store),
+ )
+
+ # We need to give _process_received_pdu the actual state events
+ # rather than event ids, so generate that now.
+
+ # First though we need to fetch all the events that are in
+ # state_map, so we can build up the state below.
+ evs = await self.store.get_events(
+ list(state_map.values()),
+ get_prev_content=False,
+ redact_behaviour=EventRedactBehaviour.AS_IS,
+ )
+ event_map.update(evs)
+
+ state = [event_map[e] for e in state_map.values()]
+ except Exception:
+ logger.warning(
+ "Error attempting to resolve state at missing prev_events",
+ exc_info=True,
+ )
+ raise FederationError(
+ "ERROR",
+ 403,
+ "We can't get valid state history.",
+ affected=event_id,
+ )
+ return state
+
+ async def _get_state_after_missing_prev_event(
+ self,
+ destination: str,
+ room_id: str,
+ event_id: str,
+ ) -> List[EventBase]:
+ """Requests all of the room state at a given event from a remote homeserver.
+
+ Args:
+ destination: The remote homeserver to query for the state.
+ room_id: The id of the room we're interested in.
+ event_id: The id of the event we want the state at.
+
+ Returns:
+ A list of events in the state, including the event itself
+ """
+ (
+ state_event_ids,
+ auth_event_ids,
+ ) = await self.federation_client.get_room_state_ids(
+ destination, room_id, event_id=event_id
+ )
+
+ logger.debug(
+ "state_ids returned %i state events, %i auth events",
+ len(state_event_ids),
+ len(auth_event_ids),
+ )
+
+ # start by just trying to fetch the events from the store
+ desired_events = set(state_event_ids)
+ desired_events.add(event_id)
+ logger.debug("Fetching %i events from cache/store", len(desired_events))
+ fetched_events = await self.store.get_events(
+ desired_events, allow_rejected=True
+ )
+
+ missing_desired_events = desired_events - fetched_events.keys()
+ logger.debug(
+ "We are missing %i events (got %i)",
+ len(missing_desired_events),
+ len(fetched_events),
+ )
+
+ # We probably won't need most of the auth events, so let's just check which
+ # we have for now, rather than thrashing the event cache with them all
+ # unnecessarily.
+
+ # TODO: we probably won't actually need all of the auth events, since we
+ # already have a bunch of the state events. It would be nice if the
+ # federation api gave us a way of finding out which we actually need.
+
+ missing_auth_events = set(auth_event_ids) - fetched_events.keys()
+ missing_auth_events.difference_update(
+ await self.store.have_seen_events(room_id, missing_auth_events)
+ )
+ logger.debug("We are also missing %i auth events", len(missing_auth_events))
+
+ missing_events = missing_desired_events | missing_auth_events
+ logger.debug("Fetching %i events from remote", len(missing_events))
+ await self._get_events_and_persist(
+ destination=destination, room_id=room_id, events=missing_events
+ )
+
+ # we need to make sure we re-load from the database to get the rejected
+ # state correct.
+ fetched_events.update(
+ await self.store.get_events(missing_desired_events, allow_rejected=True)
+ )
+
+ # check for events which were in the wrong room.
+ #
+ # this can happen if a remote server claims that the state or
+ # auth_events at an event in room A are actually events in room B
+
+ bad_events = [
+ (event_id, event.room_id)
+ for event_id, event in fetched_events.items()
+ if event.room_id != room_id
+ ]
+
+ for bad_event_id, bad_room_id in bad_events:
+ # This is a bogus situation, but since we may only discover it a long time
+ # after it happened, we try our best to carry on, by just omitting the
+ # bad events from the returned state set.
+ logger.warning(
+ "Remote server %s claims event %s in room %s is an auth/state "
+ "event in room %s",
+ destination,
+ bad_event_id,
+ bad_room_id,
+ room_id,
+ )
+
+ del fetched_events[bad_event_id]
+
+ # if we couldn't get the prev event in question, that's a problem.
+ remote_event = fetched_events.get(event_id)
+ if not remote_event:
+ raise Exception("Unable to get missing prev_event %s" % (event_id,))
+
+ # missing state at that event is a warning, not a blocker
+ # XXX: this doesn't sound right? it means that we'll end up with incomplete
+ # state.
+ failed_to_fetch = desired_events - fetched_events.keys()
+ if failed_to_fetch:
+ logger.warning(
+ "Failed to fetch missing state events for %s %s",
+ event_id,
+ failed_to_fetch,
+ )
+
+ remote_state = [
+ fetched_events[e_id] for e_id in state_event_ids if e_id in fetched_events
+ ]
+
+ if remote_event.is_state() and remote_event.rejected_reason is None:
+ remote_state.append(remote_event)
+
+ return remote_state
+
+ async def _process_received_pdu(
+ self,
+ origin: str,
+ event: EventBase,
+ state: Optional[Iterable[EventBase]],
+ backfilled: bool = False,
+ ) -> None:
+ """Called when we have a new pdu. We need to do auth checks and put it
+ through the StateHandler.
+
+ Args:
+ origin: server sending the event
+
+ event: event to be persisted
+
+ state: Normally None, but if we are handling a gap in the graph
+ (ie, we are missing one or more prev_events), the resolved state at the
+ event
+
+ backfilled: True if this is part of a historical batch of events (inhibits
+ notification to clients, and validation of device keys.)
+ """
+ logger.debug("Processing event: %s", event)
+
+ try:
+ context = await self.state_handler.compute_event_context(
+ event, old_state=state
+ )
+ await self._auth_and_persist_event(
+ origin, event, context, state=state, backfilled=backfilled
+ )
+ except AuthError as e:
+ raise FederationError("ERROR", e.code, e.msg, affected=event.event_id)
+
+ if backfilled:
+ return
+
+ # For encrypted messages we check that we know about the sending device,
+ # if we don't then we mark the device cache for that user as stale.
+ if event.type == EventTypes.Encrypted:
+ device_id = event.content.get("device_id")
+ sender_key = event.content.get("sender_key")
+
+ cached_devices = await self.store.get_cached_devices_for_user(event.sender)
+
+ resync = False # Whether we should resync device lists.
+
+ device = None
+ if device_id is not None:
+ device = cached_devices.get(device_id)
+ if device is None:
+ logger.info(
+ "Received event from remote device not in our cache: %s %s",
+ event.sender,
+ device_id,
+ )
+ resync = True
+
+ # We also check if the `sender_key` matches what we expect.
+ if sender_key is not None:
+ # Figure out what sender key we're expecting. If we know the
+ # device and recognize the algorithm then we can work out the
+ # exact key to expect. Otherwise check it matches any key we
+ # have for that device.
+
+ current_keys: Container[str] = []
+
+ if device:
+ keys = device.get("keys", {}).get("keys", {})
+
+ if (
+ event.content.get("algorithm")
+ == RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2
+ ):
+ # For this algorithm we expect a curve25519 key.
+ key_name = "curve25519:%s" % (device_id,)
+ current_keys = [keys.get(key_name)]
+ else:
+ # We don't know understand the algorithm, so we just
+ # check it matches a key for the device.
+ current_keys = keys.values()
+ elif device_id:
+ # We don't have any keys for the device ID.
+ pass
+ else:
+ # The event didn't include a device ID, so we just look for
+ # keys across all devices.
+ current_keys = [
+ key
+ for device in cached_devices.values()
+ for key in device.get("keys", {}).get("keys", {}).values()
+ ]
+
+ # We now check that the sender key matches (one of) the expected
+ # keys.
+ if sender_key not in current_keys:
+ logger.info(
+ "Received event from remote device with unexpected sender key: %s %s: %s",
+ event.sender,
+ device_id or "<no device_id>",
+ sender_key,
+ )
+ resync = True
+
+ if resync:
+ run_as_background_process(
+ "resync_device_due_to_pdu",
+ self._resync_device,
+ event.sender,
+ )
+
+ await self._handle_marker_event(origin, event)
+
+ async def _resync_device(self, sender: str) -> None:
+ """We have detected that the device list for the given user may be out
+ of sync, so we try and resync them.
+ """
+
+ try:
+ await self.store.mark_remote_user_device_cache_as_stale(sender)
+
+ # Immediately attempt a resync in the background
+ if self.config.worker_app:
+ await self._user_device_resync(user_id=sender)
+ else:
+ await self._device_list_updater.user_device_resync(sender)
+ except Exception:
+ logger.exception("Failed to resync device for %s", sender)
+
+ async def _handle_marker_event(self, origin: str, marker_event: EventBase):
+ """Handles backfilling the insertion event when we receive a marker
+ event that points to one.
+
+ Args:
+ origin: Origin of the event. Will be called to get the insertion event
+ marker_event: The event to process
+ """
+
+ if marker_event.type != EventTypes.MSC2716_MARKER:
+ # Not a marker event
+ return
+
+ if marker_event.rejected_reason is not None:
+ # Rejected event
+ return
+
+ # Skip processing a marker event if the room version doesn't
+ # support it.
+ room_version = await self.store.get_room_version(marker_event.room_id)
+ if not room_version.msc2716_historical:
+ return
+
+ logger.debug("_handle_marker_event: received %s", marker_event)
+
+ insertion_event_id = marker_event.content.get(
+ EventContentFields.MSC2716_MARKER_INSERTION
+ )
+
+ if insertion_event_id is None:
+ # Nothing to retrieve then (invalid marker)
+ return
+
+ logger.debug(
+ "_handle_marker_event: backfilling insertion event %s", insertion_event_id
+ )
+
+ await self._get_events_and_persist(
+ origin,
+ marker_event.room_id,
+ [insertion_event_id],
+ )
+
+ insertion_event = await self.store.get_event(
+ insertion_event_id, allow_none=True
+ )
+ if insertion_event is None:
+ logger.warning(
+ "_handle_marker_event: server %s didn't return insertion event %s for marker %s",
+ origin,
+ insertion_event_id,
+ marker_event.event_id,
+ )
+ return
+
+ logger.debug(
+ "_handle_marker_event: succesfully backfilled insertion event %s from marker event %s",
+ insertion_event,
+ marker_event,
+ )
+
+ await self.store.insert_insertion_extremity(
+ insertion_event_id, marker_event.room_id
+ )
+
+ logger.debug(
+ "_handle_marker_event: insertion extremity added for %s from marker event %s",
+ insertion_event,
+ marker_event,
+ )
+
+ async def _get_events_and_persist(
+ self, destination: str, room_id: str, events: Iterable[str]
+ ) -> None:
+ """Fetch the given events from a server, and persist them as outliers.
+
+ This function *does not* recursively get missing auth events of the
+ newly fetched events. Callers must include in the `events` argument
+ any missing events from the auth chain.
+
+ Logs a warning if we can't find the given event.
+ """
+
+ room_version = await self.store.get_room_version(room_id)
+
+ event_map: Dict[str, EventBase] = {}
+
+ async def get_event(event_id: str):
+ with nested_logging_context(event_id):
+ try:
+ event = await self.federation_client.get_pdu(
+ [destination],
+ event_id,
+ room_version,
+ outlier=True,
+ )
+ if event is None:
+ logger.warning(
+ "Server %s didn't return event %s",
+ destination,
+ event_id,
+ )
+ return
+
+ event_map[event.event_id] = event
+
+ except Exception as e:
+ logger.warning(
+ "Error fetching missing state/auth event %s: %s %s",
+ event_id,
+ type(e),
+ e,
+ )
+
+ await concurrently_execute(get_event, events, 5)
+
+ # Make a map of auth events for each event. We do this after fetching
+ # all the events as some of the events' auth events will be in the list
+ # of requested events.
+
+ auth_events = [
+ aid
+ for event in event_map.values()
+ for aid in event.auth_event_ids()
+ if aid not in event_map
+ ]
+ persisted_events = await self.store.get_events(
+ auth_events,
+ allow_rejected=True,
+ )
+
+ event_infos = []
+ for event in event_map.values():
+ auth = {}
+ for auth_event_id in event.auth_event_ids():
+ ae = persisted_events.get(auth_event_id) or event_map.get(auth_event_id)
+ if ae:
+ auth[(ae.type, ae.state_key)] = ae
+ else:
+ logger.info("Missing auth event %s", auth_event_id)
+
+ event_infos.append(_NewEventInfo(event, auth))
+
+ if event_infos:
+ await self._auth_and_persist_events(
+ destination,
+ room_id,
+ event_infos,
+ )
+
+ async def _auth_and_persist_events(
+ self,
+ origin: str,
+ room_id: str,
+ event_infos: Collection[_NewEventInfo],
+ ) -> None:
+ """Creates the appropriate contexts and persists events. The events
+ should not depend on one another, e.g. this should be used to persist
+ a bunch of outliers, but not a chunk of individual events that depend
+ on each other for state calculations.
+
+ Notifies about the events where appropriate.
+ """
+
+ if not event_infos:
+ return
+
+ async def prep(ev_info: _NewEventInfo):
+ event = ev_info.event
+ with nested_logging_context(suffix=event.event_id):
+ res = await self.state_handler.compute_event_context(event)
+ res = await self._check_event_auth(
+ origin,
+ event,
+ res,
+ claimed_auth_event_map=ev_info.claimed_auth_event_map,
+ )
+ return res
+
+ contexts = await make_deferred_yieldable(
+ defer.gatherResults(
+ [run_in_background(prep, ev_info) for ev_info in event_infos],
+ consumeErrors=True,
+ )
+ )
+
+ await self.persist_events_and_notify(
+ room_id,
+ [
+ (ev_info.event, context)
+ for ev_info, context in zip(event_infos, contexts)
+ ],
+ )
+
+ async def _auth_and_persist_event(
+ self,
+ origin: str,
+ event: EventBase,
+ context: EventContext,
+ state: Optional[Iterable[EventBase]] = None,
+ claimed_auth_event_map: Optional[StateMap[EventBase]] = None,
+ backfilled: bool = False,
+ ) -> None:
+ """
+ Process an event by performing auth checks and then persisting to the database.
+
+ Args:
+ origin: The host the event originates from.
+ event: The event itself.
+ context:
+ The event context.
+
+ state:
+ The state events used to check the event for soft-fail. If this is
+ not provided the current state events will be used.
+
+ claimed_auth_event_map:
+ A map of (type, state_key) => event for the event's claimed auth_events.
+ Possibly incomplete, and possibly including events that are not yet
+ persisted, or authed, or in the right room.
+
+ Only populated where we may not already have persisted these events -
+ for example, when populating outliers.
+
+ backfilled: True if the event was backfilled.
+ """
+ context = await self._check_event_auth(
+ origin,
+ event,
+ context,
+ state=state,
+ claimed_auth_event_map=claimed_auth_event_map,
+ backfilled=backfilled,
+ )
+
+ await self._run_push_actions_and_persist_event(event, context, backfilled)
+
+ async def _check_event_auth(
+ self,
+ origin: str,
+ event: EventBase,
+ context: EventContext,
+ state: Optional[Iterable[EventBase]] = None,
+ claimed_auth_event_map: Optional[StateMap[EventBase]] = None,
+ backfilled: bool = False,
+ ) -> EventContext:
+ """
+ Checks whether an event should be rejected (for failing auth checks).
+
+ Args:
+ origin: The host the event originates from.
+ event: The event itself.
+ context:
+ The event context.
+
+ state:
+ The state events used to check the event for soft-fail. If this is
+ not provided the current state events will be used.
+
+ claimed_auth_event_map:
+ A map of (type, state_key) => event for the event's claimed auth_events.
+ Possibly incomplete, and possibly including events that are not yet
+ persisted, or authed, or in the right room.
+
+ Only populated where we may not already have persisted these events -
+ for example, when populating outliers, or the state for a backwards
+ extremity.
+
+ backfilled: True if the event was backfilled.
+
+ Returns:
+ The updated context object.
+ """
+ room_version = await self.store.get_room_version_id(event.room_id)
+ room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
+
+ if claimed_auth_event_map:
+ # if we have a copy of the auth events from the event, use that as the
+ # basis for auth.
+ auth_events = claimed_auth_event_map
+ else:
+ # otherwise, we calculate what the auth events *should* be, and use that
+ prev_state_ids = await context.get_prev_state_ids()
+ auth_events_ids = self._event_auth_handler.compute_auth_events(
+ event, prev_state_ids, for_verification=True
+ )
+ auth_events_x = await self.store.get_events(auth_events_ids)
+ auth_events = {(e.type, e.state_key): e for e in auth_events_x.values()}
+
+ try:
+ (
+ context,
+ auth_events_for_auth,
+ ) = await self._update_auth_events_and_context_for_auth(
+ origin, event, context, auth_events
+ )
+ except Exception:
+ # We don't really mind if the above fails, so lets not fail
+ # processing if it does. However, it really shouldn't fail so
+ # let's still log as an exception since we'll still want to fix
+ # any bugs.
+ logger.exception(
+ "Failed to double check auth events for %s with remote. "
+ "Ignoring failure and continuing processing of event.",
+ event.event_id,
+ )
+ auth_events_for_auth = auth_events
+
+ try:
+ event_auth.check(room_version_obj, event, auth_events=auth_events_for_auth)
+ except AuthError as e:
+ logger.warning("Failed auth resolution for %r because %s", event, e)
+ context.rejected = RejectedReason.AUTH_ERROR
+
+ if not context.rejected:
+ await self._check_for_soft_fail(event, state, backfilled, origin=origin)
+
+ if event.type == EventTypes.GuestAccess and not context.rejected:
+ await self.maybe_kick_guest_users(event)
+
+ # If we are going to send this event over federation we precaclculate
+ # the joined hosts.
+ if event.internal_metadata.get_send_on_behalf_of():
+ await self.event_creation_handler.cache_joined_hosts_for_event(
+ event, context
+ )
+
+ return context
+
+ async def _check_for_soft_fail(
+ self,
+ event: EventBase,
+ state: Optional[Iterable[EventBase]],
+ backfilled: bool,
+ origin: str,
+ ) -> None:
+ """Checks if we should soft fail the event; if so, marks the event as
+ such.
+
+ Args:
+ event
+ state: The state at the event if we don't have all the event's prev events
+ backfilled: Whether the event is from backfill
+ origin: The host the event originates from.
+ """
+ # For new (non-backfilled and non-outlier) events we check if the event
+ # passes auth based on the current state. If it doesn't then we
+ # "soft-fail" the event.
+ if backfilled or event.internal_metadata.is_outlier():
+ return
+
+ extrem_ids_list = await self.store.get_latest_event_ids_in_room(event.room_id)
+ extrem_ids = set(extrem_ids_list)
+ prev_event_ids = set(event.prev_event_ids())
+
+ if extrem_ids == prev_event_ids:
+ # If they're the same then the current state is the same as the
+ # state at the event, so no point rechecking auth for soft fail.
+ return
+
+ room_version = await self.store.get_room_version_id(event.room_id)
+ room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
+
+ # Calculate the "current state".
+ if state is not None:
+ # If we're explicitly given the state then we won't have all the
+ # prev events, and so we have a gap in the graph. In this case
+ # we want to be a little careful as we might have been down for
+ # a while and have an incorrect view of the current state,
+ # however we still want to do checks as gaps are easy to
+ # maliciously manufacture.
+ #
+ # So we use a "current state" that is actually a state
+ # resolution across the current forward extremities and the
+ # given state at the event. This should correctly handle cases
+ # like bans, especially with state res v2.
+
+ state_sets_d = await self.state_store.get_state_groups(
+ event.room_id, extrem_ids
+ )
+ state_sets: List[Iterable[EventBase]] = list(state_sets_d.values())
+ state_sets.append(state)
+ current_states = await self.state_handler.resolve_events(
+ room_version, state_sets, event
+ )
+ current_state_ids: StateMap[str] = {
+ k: e.event_id for k, e in current_states.items()
+ }
+ else:
+ current_state_ids = await self.state_handler.get_current_state_ids(
+ event.room_id, latest_event_ids=extrem_ids
+ )
+
+ logger.debug(
+ "Doing soft-fail check for %s: state %s",
+ event.event_id,
+ current_state_ids,
+ )
+
+ # Now check if event pass auth against said current state
+ auth_types = auth_types_for_event(room_version_obj, event)
+ current_state_ids_list = [
+ e for k, e in current_state_ids.items() if k in auth_types
+ ]
+
+ auth_events_map = await self.store.get_events(current_state_ids_list)
+ current_auth_events = {
+ (e.type, e.state_key): e for e in auth_events_map.values()
+ }
+
+ try:
+ event_auth.check(room_version_obj, event, auth_events=current_auth_events)
+ except AuthError as e:
+ logger.warning(
+ "Soft-failing %r (from %s) because %s",
+ event,
+ e,
+ origin,
+ extra={
+ "room_id": event.room_id,
+ "mxid": event.sender,
+ "hs": origin,
+ },
+ )
+ soft_failed_event_counter.inc()
+ event.internal_metadata.soft_failed = True
+
+ async def _update_auth_events_and_context_for_auth(
+ self,
+ origin: str,
+ event: EventBase,
+ context: EventContext,
+ input_auth_events: StateMap[EventBase],
+ ) -> Tuple[EventContext, StateMap[EventBase]]:
+ """Helper for _check_event_auth. See there for docs.
+
+ Checks whether a given event has the expected auth events. If it
+ doesn't then we talk to the remote server to compare state to see if
+ we can come to a consensus (e.g. if one server missed some valid
+ state).
+
+ This attempts to resolve any potential divergence of state between
+ servers, but is not essential and so failures should not block further
+ processing of the event.
+
+ Args:
+ origin:
+ event:
+ context:
+
+ input_auth_events:
+ Map from (event_type, state_key) to event
+
+ Normally, our calculated auth_events based on the state of the room
+ at the event's position in the DAG, though occasionally (eg if the
+ event is an outlier), may be the auth events claimed by the remote
+ server.
+
+ Returns:
+ updated context, updated auth event map
+ """
+ # take a copy of input_auth_events before we modify it.
+ auth_events: MutableStateMap[EventBase] = dict(input_auth_events)
+
+ event_auth_events = set(event.auth_event_ids())
+
+ # missing_auth is the set of the event's auth_events which we don't yet have
+ # in auth_events.
+ missing_auth = event_auth_events.difference(
+ e.event_id for e in auth_events.values()
+ )
+
+ # if we have missing events, we need to fetch those events from somewhere.
+ #
+ # we start by checking if they are in the store, and then try calling /event_auth/.
+ if missing_auth:
+ have_events = await self.store.have_seen_events(event.room_id, missing_auth)
+ logger.debug("Events %s are in the store", have_events)
+ missing_auth.difference_update(have_events)
+
+ if missing_auth:
+ # If we don't have all the auth events, we need to get them.
+ logger.info("auth_events contains unknown events: %s", missing_auth)
+ try:
+ try:
+ remote_auth_chain = await self.federation_client.get_event_auth(
+ origin, event.room_id, event.event_id
+ )
+ except RequestSendFailed as e1:
+ # The other side isn't around or doesn't implement the
+ # endpoint, so lets just bail out.
+ logger.info("Failed to get event auth from remote: %s", e1)
+ return context, auth_events
+
+ seen_remotes = await self.store.have_seen_events(
+ event.room_id, [e.event_id for e in remote_auth_chain]
+ )
+
+ for e in remote_auth_chain:
+ if e.event_id in seen_remotes:
+ continue
+
+ if e.event_id == event.event_id:
+ continue
+
+ try:
+ auth_ids = e.auth_event_ids()
+ auth = {
+ (e.type, e.state_key): e
+ for e in remote_auth_chain
+ if e.event_id in auth_ids or e.type == EventTypes.Create
+ }
+ e.internal_metadata.outlier = True
+
+ logger.debug(
+ "_check_event_auth %s missing_auth: %s",
+ event.event_id,
+ e.event_id,
+ )
+ missing_auth_event_context = (
+ await self.state_handler.compute_event_context(e)
+ )
+ await self._auth_and_persist_event(
+ origin,
+ e,
+ missing_auth_event_context,
+ claimed_auth_event_map=auth,
+ )
+
+ if e.event_id in event_auth_events:
+ auth_events[(e.type, e.state_key)] = e
+ except AuthError:
+ pass
+
+ except Exception:
+ logger.exception("Failed to get auth chain")
+
+ if event.internal_metadata.is_outlier():
+ # XXX: given that, for an outlier, we'll be working with the
+ # event's *claimed* auth events rather than those we calculated:
+ # (a) is there any point in this test, since different_auth below will
+ # obviously be empty
+ # (b) alternatively, why don't we do it earlier?
+ logger.info("Skipping auth_event fetch for outlier")
+ return context, auth_events
+
+ different_auth = event_auth_events.difference(
+ e.event_id for e in auth_events.values()
+ )
+
+ if not different_auth:
+ return context, auth_events
+
+ logger.info(
+ "auth_events refers to events which are not in our calculated auth "
+ "chain: %s",
+ different_auth,
+ )
+
+ # XXX: currently this checks for redactions but I'm not convinced that is
+ # necessary?
+ different_events = await self.store.get_events_as_list(different_auth)
+
+ for d in different_events:
+ if d.room_id != event.room_id:
+ logger.warning(
+ "Event %s refers to auth_event %s which is in a different room",
+ event.event_id,
+ d.event_id,
+ )
+
+ # don't attempt to resolve the claimed auth events against our own
+ # in this case: just use our own auth events.
+ #
+ # XXX: should we reject the event in this case? It feels like we should,
+ # but then shouldn't we also do so if we've failed to fetch any of the
+ # auth events?
+ return context, auth_events
+
+ # now we state-resolve between our own idea of the auth events, and the remote's
+ # idea of them.
+
+ local_state = auth_events.values()
+ remote_auth_events = dict(auth_events)
+ remote_auth_events.update({(d.type, d.state_key): d for d in different_events})
+ remote_state = remote_auth_events.values()
+
+ room_version = await self.store.get_room_version_id(event.room_id)
+ new_state = await self.state_handler.resolve_events(
+ room_version, (local_state, remote_state), event
+ )
+
+ logger.info(
+ "After state res: updating auth_events with new state %s",
+ {
+ (d.type, d.state_key): d.event_id
+ for d in new_state.values()
+ if auth_events.get((d.type, d.state_key)) != d
+ },
+ )
+
+ auth_events.update(new_state)
+
+ context = await self._update_context_for_auth_events(
+ event, context, auth_events
+ )
+
+ return context, auth_events
+
+ async def _update_context_for_auth_events(
+ self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase]
+ ) -> EventContext:
+ """Update the state_ids in an event context after auth event resolution,
+ storing the changes as a new state group.
+
+ Args:
+ event: The event we're handling the context for
+
+ context: initial event context
+
+ auth_events: Events to update in the event context.
+
+ Returns:
+ new event context
+ """
+ # exclude the state key of the new event from the current_state in the context.
+ if event.is_state():
+ event_key: Optional[Tuple[str, str]] = (event.type, event.state_key)
+ else:
+ event_key = None
+ state_updates = {
+ k: a.event_id for k, a in auth_events.items() if k != event_key
+ }
+
+ current_state_ids = await context.get_current_state_ids()
+ current_state_ids = dict(current_state_ids) # type: ignore
+
+ current_state_ids.update(state_updates)
+
+ prev_state_ids = await context.get_prev_state_ids()
+ prev_state_ids = dict(prev_state_ids)
+
+ prev_state_ids.update({k: a.event_id for k, a in auth_events.items()})
+
+ # create a new state group as a delta from the existing one.
+ prev_group = context.state_group
+ state_group = await self.state_store.store_state_group(
+ event.event_id,
+ event.room_id,
+ prev_group=prev_group,
+ delta_ids=state_updates,
+ current_state_ids=current_state_ids,
+ )
+
+ return EventContext.with_state(
+ state_group=state_group,
+ state_group_before_event=context.state_group_before_event,
+ current_state_ids=current_state_ids,
+ prev_state_ids=prev_state_ids,
+ prev_group=prev_group,
+ delta_ids=state_updates,
+ )
+
+ async def _run_push_actions_and_persist_event(
+ self, event: EventBase, context: EventContext, backfilled: bool = False
+ ):
+ """Run the push actions for a received event, and persist it.
+
+ Args:
+ event: The event itself.
+ context: The event context.
+ backfilled: True if the event was backfilled.
+ """
+ try:
+ if (
+ not event.internal_metadata.is_outlier()
+ and not backfilled
+ and not context.rejected
+ and (await self.store.get_min_depth(event.room_id)) <= event.depth
+ ):
+ await self.action_generator.handle_push_actions_for_event(
+ event, context
+ )
+
+ await self.persist_events_and_notify(
+ event.room_id, [(event, context)], backfilled=backfilled
+ )
+ except Exception:
+ run_in_background(
+ self.store.remove_push_actions_from_staging, event.event_id
+ )
+ raise
+
+ async def persist_events_and_notify(
+ self,
+ room_id: str,
+ event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
+ backfilled: bool = False,
+ ) -> int:
+ """Persists events and tells the notifier/pushers about them, if
+ necessary.
+
+ Args:
+ room_id: The room ID of events being persisted.
+ event_and_contexts: Sequence of events with their associated
+ context that should be persisted. All events must belong to
+ the same room.
+ backfilled: Whether these events are a result of
+ backfilling or not
+
+ Returns:
+ The stream ID after which all events have been persisted.
+ """
+ if not event_and_contexts:
+ return self.store.get_current_events_token()
+
+ instance = self.config.worker.events_shard_config.get_instance(room_id)
+ if instance != self._instance_name:
+ # Limit the number of events sent over replication. We choose 200
+ # here as that is what we default to in `max_request_body_size(..)`
+ for batch in batch_iter(event_and_contexts, 200):
+ result = await self._send_events(
+ instance_name=instance,
+ store=self.store,
+ room_id=room_id,
+ event_and_contexts=batch,
+ backfilled=backfilled,
+ )
+ return result["max_stream_id"]
+ else:
+ assert self.storage.persistence
+
+ # Note that this returns the events that were persisted, which may not be
+ # the same as were passed in if some were deduplicated due to transaction IDs.
+ events, max_stream_token = await self.storage.persistence.persist_events(
+ event_and_contexts, backfilled=backfilled
+ )
+
+ if self._ephemeral_messages_enabled:
+ for event in events:
+ # If there's an expiry timestamp on the event, schedule its expiry.
+ self._message_handler.maybe_schedule_expiry(event)
+
+ if not backfilled: # Never notify for backfilled events
+ for event in events:
+ await self._notify_persisted_event(event, max_stream_token)
+
+ return max_stream_token.stream
+
+ async def _notify_persisted_event(
+ self, event: EventBase, max_stream_token: RoomStreamToken
+ ) -> None:
+ """Checks to see if notifier/pushers should be notified about the
+ event or not.
+
+ Args:
+ event:
+ max_stream_token: The max_stream_id returned by persist_events
+ """
+
+ extra_users = []
+ if event.type == EventTypes.Member:
+ target_user_id = event.state_key
+
+ # We notify for memberships if its an invite for one of our
+ # users
+ if event.internal_metadata.is_outlier():
+ if event.membership != Membership.INVITE:
+ if not self.is_mine_id(target_user_id):
+ return
+
+ target_user = UserID.from_string(target_user_id)
+ extra_users.append(target_user)
+ elif event.internal_metadata.is_outlier():
+ return
+
+ # the event has been persisted so it should have a stream ordering.
+ assert event.internal_metadata.stream_ordering
+
+ event_pos = PersistedEventPosition(
+ self._instance_name, event.internal_metadata.stream_ordering
+ )
+ self.notifier.on_new_room_event(
+ event, event_pos, max_stream_token, extra_users=extra_users
+ )
+
+ def _sanity_check_event(self, ev: EventBase) -> None:
+ """
+ Do some early sanity checks of a received event
+
+ In particular, checks it doesn't have an excessive number of
+ prev_events or auth_events, which could cause a huge state resolution
+ or cascade of event fetches.
+
+ Args:
+ ev: event to be checked
+
+ Raises:
+ SynapseError if the event does not pass muster
+ """
+ if len(ev.prev_event_ids()) > 20:
+ logger.warning(
+ "Rejecting event %s which has %i prev_events",
+ ev.event_id,
+ len(ev.prev_event_ids()),
+ )
+ raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many prev_events")
+
+ if len(ev.auth_event_ids()) > 10:
+ logger.warning(
+ "Rejecting event %s which has %i auth_events",
+ ev.event_id,
+ len(ev.auth_event_ids()),
+ )
+ raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
+
+ async def get_min_depth_for_context(self, context: str) -> int:
+ return await self.store.get_min_depth(context)
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index e1c544a3c9..4e8f7f1d85 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -151,7 +151,7 @@ class InitialSyncHandler(BaseHandler):
limit = 10
async def handle_room(event: RoomsForUser):
- d = {
+ d: JsonDict = {
"room_id": event.room_id,
"membership": event.membership,
"visibility": (
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 7ca14e1d84..4418d63df7 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -353,6 +353,11 @@ class BasePresenceHandler(abc.ABC):
# otherwise would not do).
await self.set_state(UserID.from_string(user_id), state, force_notify=True)
+ async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool:
+ raise NotImplementedError(
+ "Attempting to check presence on a non-presence worker."
+ )
+
class _NullContextManager(ContextManager[None]):
"""A context manager which does nothing."""
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 8cf614136e..0ed59d757b 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -56,6 +56,22 @@ login_counter = Counter(
)
+def init_counters_for_auth_provider(auth_provider_id: str) -> None:
+ """Ensure the prometheus counters for the given auth provider are initialised
+
+ This fixes a problem where the counters are not reported for a given auth provider
+ until the user first logs in/registers.
+ """
+ for is_guest in (True, False):
+ login_counter.labels(guest=is_guest, auth_provider=auth_provider_id)
+ for shadow_banned in (True, False):
+ registration_counter.labels(
+ guest=is_guest,
+ shadow_banned=shadow_banned,
+ auth_provider=auth_provider_id,
+ )
+
+
class LoginDict(TypedDict):
device_id: str
access_token: str
@@ -96,6 +112,8 @@ class RegistrationHandler(BaseHandler):
self.session_lifetime = hs.config.session_lifetime
self.access_token_lifetime = hs.config.access_token_lifetime
+ init_counters_for_auth_provider("")
+
async def check_username(
self,
localpart: str,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 585d830951..bfde4fe33c 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -36,6 +36,7 @@ from synapse.api.ratelimiting import Ratelimiter
from synapse.event_auth import get_named_level, get_power_level_event
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
+from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
from synapse.types import (
JsonDict,
Requester,
@@ -79,7 +80,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
self.account_data_handler = hs.get_account_data_handler()
self.event_auth_handler = hs.get_event_auth_handler()
- self.member_linearizer = Linearizer(name="member")
+ self.member_linearizer: Linearizer = Linearizer(name="member")
self.member_limiter = Linearizer(max_count=10, name="member_as_limiter")
self.clock = hs.get_clock()
@@ -576,6 +577,20 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
content.pop("displayname", None)
content.pop("avatar_url", None)
+ if len(content.get("displayname") or "") > MAX_DISPLAYNAME_LEN:
+ raise SynapseError(
+ 400,
+ f"Displayname is too long (max {MAX_DISPLAYNAME_LEN})",
+ errcode=Codes.BAD_JSON,
+ )
+
+ if len(content.get("avatar_url") or "") > MAX_AVATAR_URL_LEN:
+ raise SynapseError(
+ 400,
+ f"Avatar URL is too long (max {MAX_AVATAR_URL_LEN})",
+ errcode=Codes.BAD_JSON,
+ )
+
effective_membership_state = action
if action in ["kick", "unban"]:
effective_membership_state = "leave"
diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py
index ac6cfc0da9..906985c754 100644
--- a/synapse/handlers/room_summary.py
+++ b/synapse/handlers/room_summary.py
@@ -28,12 +28,11 @@ from synapse.api.constants import (
Membership,
RoomTypes,
)
-from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
+from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
from synapse.events import EventBase
from synapse.events.utils import format_event_for_client_v2
from synapse.types import JsonDict
from synapse.util.caches.response_cache import ResponseCache
-from synapse.util.stringutils import random_string
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -76,6 +75,9 @@ class _PaginationSession:
class RoomSummaryHandler:
+ # A unique key used for pagination sessions for the room hierarchy endpoint.
+ _PAGINATION_SESSION_TYPE = "room_hierarchy_pagination"
+
# The time a pagination session remains valid for.
_PAGINATION_SESSION_VALIDITY_PERIOD_MS = 5 * 60 * 1000
@@ -87,12 +89,6 @@ class RoomSummaryHandler:
self._server_name = hs.hostname
self._federation_client = hs.get_federation_client()
- # A map of query information to the current pagination state.
- #
- # TODO Allow for multiple workers to share this data.
- # TODO Expire pagination tokens.
- self._pagination_sessions: Dict[_PaginationKey, _PaginationSession] = {}
-
# If a user tries to fetch the same page multiple times in quick succession,
# only process the first attempt and return its result to subsequent requests.
self._pagination_response_cache: ResponseCache[
@@ -102,21 +98,6 @@ class RoomSummaryHandler:
"get_room_hierarchy",
)
- def _expire_pagination_sessions(self):
- """Expire pagination session which are old."""
- expire_before = (
- self._clock.time_msec() - self._PAGINATION_SESSION_VALIDITY_PERIOD_MS
- )
- to_expire = []
-
- for key, value in self._pagination_sessions.items():
- if value.creation_time_ms < expire_before:
- to_expire.append(key)
-
- for key in to_expire:
- logger.debug("Expiring pagination session id %s", key)
- del self._pagination_sessions[key]
-
async def get_space_summary(
self,
requester: str,
@@ -327,18 +308,29 @@ class RoomSummaryHandler:
# If this is continuing a previous session, pull the persisted data.
if from_token:
- self._expire_pagination_sessions()
+ try:
+ pagination_session = await self._store.get_session(
+ session_type=self._PAGINATION_SESSION_TYPE,
+ session_id=from_token,
+ )
+ except StoreError:
+ raise SynapseError(400, "Unknown pagination token", Codes.INVALID_PARAM)
- pagination_key = _PaginationKey(
- requested_room_id, suggested_only, max_depth, from_token
- )
- if pagination_key not in self._pagination_sessions:
+ # If the requester, room ID, suggested-only, or max depth were modified
+ # the session is invalid.
+ if (
+ requester != pagination_session["requester"]
+ or requested_room_id != pagination_session["room_id"]
+ or suggested_only != pagination_session["suggested_only"]
+ or max_depth != pagination_session["max_depth"]
+ ):
raise SynapseError(400, "Unknown pagination token", Codes.INVALID_PARAM)
# Load the previous state.
- pagination_session = self._pagination_sessions[pagination_key]
- room_queue = pagination_session.room_queue
- processed_rooms = pagination_session.processed_rooms
+ room_queue = [
+ _RoomQueueEntry(*fields) for fields in pagination_session["room_queue"]
+ ]
+ processed_rooms = set(pagination_session["processed_rooms"])
else:
# The queue of rooms to process, the next room is last on the stack.
room_queue = [_RoomQueueEntry(requested_room_id, ())]
@@ -456,13 +448,21 @@ class RoomSummaryHandler:
# If there's additional data, generate a pagination token (and persist state).
if room_queue:
- next_batch = random_string(24)
- result["next_batch"] = next_batch
- pagination_key = _PaginationKey(
- requested_room_id, suggested_only, max_depth, next_batch
- )
- self._pagination_sessions[pagination_key] = _PaginationSession(
- self._clock.time_msec(), room_queue, processed_rooms
+ result["next_batch"] = await self._store.create_session(
+ session_type=self._PAGINATION_SESSION_TYPE,
+ value={
+ # Information which must be identical across pagination.
+ "requester": requester,
+ "room_id": requested_room_id,
+ "suggested_only": suggested_only,
+ "max_depth": max_depth,
+ # The stored state.
+ "room_queue": [
+ attr.astuple(room_entry) for room_entry in room_queue
+ ],
+ "processed_rooms": list(processed_rooms),
+ },
+ expiry_ms=self._PAGINATION_SESSION_VALIDITY_PERIOD_MS,
)
return result
diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py
index 1b855a685c..0e6ebb574e 100644
--- a/synapse/handlers/sso.py
+++ b/synapse/handlers/sso.py
@@ -37,6 +37,7 @@ from twisted.web.server import Request
from synapse.api.constants import LoginType
from synapse.api.errors import Codes, NotFoundError, RedirectException, SynapseError
from synapse.config.sso import SsoAttributeRequirement
+from synapse.handlers.register import init_counters_for_auth_provider
from synapse.handlers.ui_auth import UIAuthSessionDataConstants
from synapse.http import get_request_user_agent
from synapse.http.server import respond_with_html, respond_with_redirect
@@ -213,6 +214,7 @@ class SsoHandler:
p_id = p.idp_id
assert p_id not in self._identity_providers
self._identity_providers[p_id] = p
+ init_counters_for_auth_provider(p_id)
def get_identity_providers(self) -> Mapping[str, SsoIdentityProvider]:
"""Get the configured identity providers"""
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 590642f510..86c3c7f0df 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1,5 +1,4 @@
-# Copyright 2015, 2016 OpenMarket Ltd
-# Copyright 2018, 2019 New Vector Ltd
+# Copyright 2015-2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -31,6 +30,8 @@ from prometheus_client import Counter
from synapse.api.constants import AccountDataTypes, EventTypes, Membership
from synapse.api.filtering import FilterCollection
+from synapse.api.presence import UserPresenceState
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
from synapse.logging.context import current_context
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
@@ -86,20 +87,20 @@ LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
SyncRequestKey = Tuple[Any, ...]
-@attr.s(slots=True, frozen=True)
+@attr.s(slots=True, frozen=True, auto_attribs=True)
class SyncConfig:
- user = attr.ib(type=UserID)
- filter_collection = attr.ib(type=FilterCollection)
- is_guest = attr.ib(type=bool)
- request_key = attr.ib(type=SyncRequestKey)
- device_id = attr.ib(type=Optional[str])
+ user: UserID
+ filter_collection: FilterCollection
+ is_guest: bool
+ request_key: SyncRequestKey
+ device_id: Optional[str]
-@attr.s(slots=True, frozen=True)
+@attr.s(slots=True, frozen=True, auto_attribs=True)
class TimelineBatch:
- prev_batch = attr.ib(type=StreamToken)
- events = attr.ib(type=List[EventBase])
- limited = attr.ib(type=bool)
+ prev_batch: StreamToken
+ events: List[EventBase]
+ limited: bool
def __bool__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
@@ -113,16 +114,16 @@ class TimelineBatch:
# if there are updates for it, which we check after the instance has been created.
# This should not be a big deal because we update the notification counts afterwards as
# well anyway.
-@attr.s(slots=True)
+@attr.s(slots=True, auto_attribs=True)
class JoinedSyncResult:
- room_id = attr.ib(type=str)
- timeline = attr.ib(type=TimelineBatch)
- state = attr.ib(type=StateMap[EventBase])
- ephemeral = attr.ib(type=List[JsonDict])
- account_data = attr.ib(type=List[JsonDict])
- unread_notifications = attr.ib(type=JsonDict)
- summary = attr.ib(type=Optional[JsonDict])
- unread_count = attr.ib(type=int)
+ room_id: str
+ timeline: TimelineBatch
+ state: StateMap[EventBase]
+ ephemeral: List[JsonDict]
+ account_data: List[JsonDict]
+ unread_notifications: JsonDict
+ summary: Optional[JsonDict]
+ unread_count: int
def __bool__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
@@ -138,12 +139,12 @@ class JoinedSyncResult:
)
-@attr.s(slots=True, frozen=True)
+@attr.s(slots=True, frozen=True, auto_attribs=True)
class ArchivedSyncResult:
- room_id = attr.ib(type=str)
- timeline = attr.ib(type=TimelineBatch)
- state = attr.ib(type=StateMap[EventBase])
- account_data = attr.ib(type=List[JsonDict])
+ room_id: str
+ timeline: TimelineBatch
+ state: StateMap[EventBase]
+ account_data: List[JsonDict]
def __bool__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
@@ -152,37 +153,37 @@ class ArchivedSyncResult:
return bool(self.timeline or self.state or self.account_data)
-@attr.s(slots=True, frozen=True)
+@attr.s(slots=True, frozen=True, auto_attribs=True)
class InvitedSyncResult:
- room_id = attr.ib(type=str)
- invite = attr.ib(type=EventBase)
+ room_id: str
+ invite: EventBase
def __bool__(self) -> bool:
"""Invited rooms should always be reported to the client"""
return True
-@attr.s(slots=True, frozen=True)
+@attr.s(slots=True, frozen=True, auto_attribs=True)
class KnockedSyncResult:
- room_id = attr.ib(type=str)
- knock = attr.ib(type=EventBase)
+ room_id: str
+ knock: EventBase
def __bool__(self) -> bool:
"""Knocked rooms should always be reported to the client"""
return True
-@attr.s(slots=True, frozen=True)
+@attr.s(slots=True, frozen=True, auto_attribs=True)
class GroupsSyncResult:
- join = attr.ib(type=JsonDict)
- invite = attr.ib(type=JsonDict)
- leave = attr.ib(type=JsonDict)
+ join: JsonDict
+ invite: JsonDict
+ leave: JsonDict
def __bool__(self) -> bool:
return bool(self.join or self.invite or self.leave)
-@attr.s(slots=True, frozen=True)
+@attr.s(slots=True, frozen=True, auto_attribs=True)
class DeviceLists:
"""
Attributes:
@@ -190,27 +191,27 @@ class DeviceLists:
left: List of user_ids whose devices we no longer track
"""
- changed = attr.ib(type=Collection[str])
- left = attr.ib(type=Collection[str])
+ changed: Collection[str]
+ left: Collection[str]
def __bool__(self) -> bool:
return bool(self.changed or self.left)
-@attr.s(slots=True)
+@attr.s(slots=True, auto_attribs=True)
class _RoomChanges:
"""The set of room entries to include in the sync, plus the set of joined
and left room IDs since last sync.
"""
- room_entries = attr.ib(type=List["RoomSyncResultBuilder"])
- invited = attr.ib(type=List[InvitedSyncResult])
- knocked = attr.ib(type=List[KnockedSyncResult])
- newly_joined_rooms = attr.ib(type=List[str])
- newly_left_rooms = attr.ib(type=List[str])
+ room_entries: List["RoomSyncResultBuilder"]
+ invited: List[InvitedSyncResult]
+ knocked: List[KnockedSyncResult]
+ newly_joined_rooms: List[str]
+ newly_left_rooms: List[str]
-@attr.s(slots=True, frozen=True)
+@attr.s(slots=True, frozen=True, auto_attribs=True)
class SyncResult:
"""
Attributes:
@@ -230,18 +231,18 @@ class SyncResult:
groups: Group updates, if any
"""
- next_batch = attr.ib(type=StreamToken)
- presence = attr.ib(type=List[JsonDict])
- account_data = attr.ib(type=List[JsonDict])
- joined = attr.ib(type=List[JoinedSyncResult])
- invited = attr.ib(type=List[InvitedSyncResult])
- knocked = attr.ib(type=List[KnockedSyncResult])
- archived = attr.ib(type=List[ArchivedSyncResult])
- to_device = attr.ib(type=List[JsonDict])
- device_lists = attr.ib(type=DeviceLists)
- device_one_time_keys_count = attr.ib(type=JsonDict)
- device_unused_fallback_key_types = attr.ib(type=List[str])
- groups = attr.ib(type=Optional[GroupsSyncResult])
+ next_batch: StreamToken
+ presence: List[UserPresenceState]
+ account_data: List[JsonDict]
+ joined: List[JoinedSyncResult]
+ invited: List[InvitedSyncResult]
+ knocked: List[KnockedSyncResult]
+ archived: List[ArchivedSyncResult]
+ to_device: List[JsonDict]
+ device_lists: DeviceLists
+ device_one_time_keys_count: JsonDict
+ device_unused_fallback_key_types: List[str]
+ groups: Optional[GroupsSyncResult]
def __bool__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
@@ -701,7 +702,7 @@ class SyncHandler:
name_id = state_ids.get((EventTypes.Name, ""))
canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ""))
- summary = {}
+ summary: JsonDict = {}
empty_ms = MemberSummary([], 0)
# TODO: only send these when they change.
@@ -1843,6 +1844,9 @@ class SyncHandler:
knocked = []
for event in room_list:
+ if event.room_version_id not in KNOWN_ROOM_VERSIONS:
+ continue
+
if event.membership == Membership.JOIN:
room_entries.append(
RoomSyncResultBuilder(
@@ -2076,21 +2080,23 @@ class SyncHandler:
# If the membership's stream ordering is after the given stream
# ordering, we need to go and work out if the user was in the room
# before.
- for room_id, event_pos in joined_rooms:
- if not event_pos.persisted_after(room_key):
- joined_room_ids.add(room_id)
+ for joined_room in joined_rooms:
+ if not joined_room.event_pos.persisted_after(room_key):
+ joined_room_ids.add(joined_room.room_id)
continue
- logger.info("User joined room after current token: %s", room_id)
+ logger.info("User joined room after current token: %s", joined_room.room_id)
extrems = (
await self.store.get_forward_extremities_for_room_at_stream_ordering(
- room_id, event_pos.stream
+ joined_room.room_id, joined_room.event_pos.stream
)
)
- users_in_room = await self.state.get_current_users_in_room(room_id, extrems)
+ users_in_room = await self.state.get_current_users_in_room(
+ joined_room.room_id, extrems
+ )
if user_id in users_in_room:
- joined_room_ids.add(room_id)
+ joined_room_ids.add(joined_room.room_id)
return frozenset(joined_room_ids)
@@ -2160,7 +2166,7 @@ def _calculate_state(
return {event_id_to_key[e]: e for e in state_ids}
-@attr.s(slots=True)
+@attr.s(slots=True, auto_attribs=True)
class SyncResultBuilder:
"""Used to help build up a new SyncResult for a user
@@ -2172,33 +2178,33 @@ class SyncResultBuilder:
joined_room_ids: List of rooms the user is joined to
# The following mirror the fields in a sync response
- presence (list)
- account_data (list)
- joined (list[JoinedSyncResult])
- invited (list[InvitedSyncResult])
- knocked (list[KnockedSyncResult])
- archived (list[ArchivedSyncResult])
- groups (GroupsSyncResult|None)
- to_device (list)
+ presence
+ account_data
+ joined
+ invited
+ knocked
+ archived
+ groups
+ to_device
"""
- sync_config = attr.ib(type=SyncConfig)
- full_state = attr.ib(type=bool)
- since_token = attr.ib(type=Optional[StreamToken])
- now_token = attr.ib(type=StreamToken)
- joined_room_ids = attr.ib(type=FrozenSet[str])
+ sync_config: SyncConfig
+ full_state: bool
+ since_token: Optional[StreamToken]
+ now_token: StreamToken
+ joined_room_ids: FrozenSet[str]
- presence = attr.ib(type=List[JsonDict], default=attr.Factory(list))
- account_data = attr.ib(type=List[JsonDict], default=attr.Factory(list))
- joined = attr.ib(type=List[JoinedSyncResult], default=attr.Factory(list))
- invited = attr.ib(type=List[InvitedSyncResult], default=attr.Factory(list))
- knocked = attr.ib(type=List[KnockedSyncResult], default=attr.Factory(list))
- archived = attr.ib(type=List[ArchivedSyncResult], default=attr.Factory(list))
- groups = attr.ib(type=Optional[GroupsSyncResult], default=None)
- to_device = attr.ib(type=List[JsonDict], default=attr.Factory(list))
+ presence: List[UserPresenceState] = attr.Factory(list)
+ account_data: List[JsonDict] = attr.Factory(list)
+ joined: List[JoinedSyncResult] = attr.Factory(list)
+ invited: List[InvitedSyncResult] = attr.Factory(list)
+ knocked: List[KnockedSyncResult] = attr.Factory(list)
+ archived: List[ArchivedSyncResult] = attr.Factory(list)
+ groups: Optional[GroupsSyncResult] = None
+ to_device: List[JsonDict] = attr.Factory(list)
-@attr.s(slots=True)
+@attr.s(slots=True, auto_attribs=True)
class RoomSyncResultBuilder:
"""Stores information needed to create either a `JoinedSyncResult` or
`ArchivedSyncResult`.
@@ -2214,10 +2220,10 @@ class RoomSyncResultBuilder:
upto_token: Latest point to return events from.
"""
- room_id = attr.ib(type=str)
- rtype = attr.ib(type=str)
- events = attr.ib(type=Optional[List[EventBase]])
- newly_joined = attr.ib(type=bool)
- full_state = attr.ib(type=bool)
- since_token = attr.ib(type=Optional[StreamToken])
- upto_token = attr.ib(type=StreamToken)
+ room_id: str
+ rtype: str
+ events: Optional[List[EventBase]]
+ newly_joined: bool
+ full_state: bool
+ since_token: Optional[StreamToken]
+ upto_token: StreamToken
diff --git a/synapse/handlers/ui_auth/__init__.py b/synapse/handlers/ui_auth/__init__.py
index 4c3b669fae..13b0c61d2e 100644
--- a/synapse/handlers/ui_auth/__init__.py
+++ b/synapse/handlers/ui_auth/__init__.py
@@ -34,3 +34,8 @@ class UIAuthSessionDataConstants:
# used by validate_user_via_ui_auth to store the mxid of the user we are validating
# for.
REQUEST_USER_ID = "request_user_id"
+
+ # used during registration to store the registration token used (if required) so that:
+ # - we can prevent a token being used twice by one session
+ # - we can 'use up' the token after registration has successfully completed
+ REGISTRATION_TOKEN = "org.matrix.msc3231.login.registration_token"
diff --git a/synapse/handlers/ui_auth/checkers.py b/synapse/handlers/ui_auth/checkers.py
index 5414ce77d8..d3828dec6b 100644
--- a/synapse/handlers/ui_auth/checkers.py
+++ b/synapse/handlers/ui_auth/checkers.py
@@ -49,7 +49,7 @@ class UserInteractiveAuthChecker:
clientip: The IP address of the client.
Raises:
- SynapseError if authentication failed
+ LoginError if authentication failed.
Returns:
The result of authentication (to pass back to the client?)
@@ -131,7 +131,9 @@ class RecaptchaAuthChecker(UserInteractiveAuthChecker):
)
if resp_body["success"]:
return True
- raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
+ raise LoginError(
+ 401, "Captcha authentication failed", errcode=Codes.UNAUTHORIZED
+ )
class _BaseThreepidAuthChecker:
@@ -191,7 +193,9 @@ class _BaseThreepidAuthChecker:
raise AssertionError("Unrecognized threepid medium: %s" % (medium,))
if not threepid:
- raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
+ raise LoginError(
+ 401, "Unable to get validated threepid", errcode=Codes.UNAUTHORIZED
+ )
if threepid["medium"] != medium:
raise LoginError(
@@ -237,11 +241,76 @@ class MsisdnAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChecker):
return await self._check_threepid("msisdn", authdict)
+class RegistrationTokenAuthChecker(UserInteractiveAuthChecker):
+ AUTH_TYPE = LoginType.REGISTRATION_TOKEN
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__(hs)
+ self.hs = hs
+ self._enabled = bool(hs.config.registration_requires_token)
+ self.store = hs.get_datastore()
+
+ def is_enabled(self) -> bool:
+ return self._enabled
+
+ async def check_auth(self, authdict: dict, clientip: str) -> Any:
+ if "token" not in authdict:
+ raise LoginError(400, "Missing registration token", Codes.MISSING_PARAM)
+ if not isinstance(authdict["token"], str):
+ raise LoginError(
+ 400, "Registration token must be a string", Codes.INVALID_PARAM
+ )
+ if "session" not in authdict:
+ raise LoginError(400, "Missing UIA session", Codes.MISSING_PARAM)
+
+ # Get these here to avoid cyclic dependencies
+ from synapse.handlers.ui_auth import UIAuthSessionDataConstants
+
+ auth_handler = self.hs.get_auth_handler()
+
+ session = authdict["session"]
+ token = authdict["token"]
+
+ # If the LoginType.REGISTRATION_TOKEN stage has already been completed,
+ # return early to avoid incrementing `pending` again.
+ stored_token = await auth_handler.get_session_data(
+ session, UIAuthSessionDataConstants.REGISTRATION_TOKEN
+ )
+ if stored_token:
+ if token != stored_token:
+ raise LoginError(
+ 400, "Registration token has changed", Codes.INVALID_PARAM
+ )
+ else:
+ return token
+
+ if await self.store.registration_token_is_valid(token):
+ # Increment pending counter, so that if token has limited uses it
+ # can't be used up by someone else in the meantime.
+ await self.store.set_registration_token_pending(token)
+ # Store the token in the UIA session, so that once registration
+ # is complete `completed` can be incremented.
+ await auth_handler.set_session_data(
+ session,
+ UIAuthSessionDataConstants.REGISTRATION_TOKEN,
+ token,
+ )
+ # The token will be stored as the result of the authentication stage
+ # in ui_auth_sessions_credentials. This allows the pending counter
+ # for tokens to be decremented when expired sessions are deleted.
+ return token
+ else:
+ raise LoginError(
+ 401, "Invalid registration token", errcode=Codes.UNAUTHORIZED
+ )
+
+
INTERACTIVE_AUTH_CHECKERS = [
DummyAuthChecker,
TermsAuthChecker,
RecaptchaAuthChecker,
EmailIdentityAuthChecker,
MsisdnAuthChecker,
+ RegistrationTokenAuthChecker,
]
"""A list of UserInteractiveAuthChecker classes"""
|