diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 6e6eaf3805..82e6475ef5 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -26,6 +26,7 @@ from synapse.api.errors import (
Codes,
InvalidClientTokenError,
MissingClientTokenError,
+ UnstableSpecAuthError,
)
from synapse.appservice import ApplicationService
from synapse.http import get_request_user_agent
@@ -106,8 +107,11 @@ class Auth:
forgot = await self.store.did_forget(user_id, room_id)
if not forgot:
return membership, member_event_id
-
- raise AuthError(403, "User %s not in room %s" % (user_id, room_id))
+ raise UnstableSpecAuthError(
+ 403,
+ "User %s not in room %s" % (user_id, room_id),
+ errcode=Codes.NOT_JOINED,
+ )
async def get_user_by_req(
self,
@@ -600,8 +604,9 @@ class Auth:
== HistoryVisibility.WORLD_READABLE
):
return Membership.JOIN, None
- raise AuthError(
+ raise UnstableSpecAuthError(
403,
"User %s not in room %s, and room previews are disabled"
% (user_id, room_id),
+ errcode=Codes.NOT_JOINED,
)
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 2653764119..789859e69e 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -268,4 +268,4 @@ class PublicRoomsFilterFields:
"""
GENERIC_SEARCH_TERM: Final = "generic_search_term"
- ROOM_TYPES: Final = "org.matrix.msc3827.room_types"
+ ROOM_TYPES: Final = "room_types"
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index 1c74e131f2..e6dea89c6d 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -26,6 +26,7 @@ from twisted.web import http
from synapse.util import json_decoder
if typing.TYPE_CHECKING:
+ from synapse.config.homeserver import HomeServerConfig
from synapse.types import JsonDict
logger = logging.getLogger(__name__)
@@ -80,6 +81,12 @@ class Codes(str, Enum):
INVALID_SIGNATURE = "M_INVALID_SIGNATURE"
USER_DEACTIVATED = "M_USER_DEACTIVATED"
+ # Part of MSC3848
+ # https://github.com/matrix-org/matrix-spec-proposals/pull/3848
+ ALREADY_JOINED = "ORG.MATRIX.MSC3848.ALREADY_JOINED"
+ NOT_JOINED = "ORG.MATRIX.MSC3848.NOT_JOINED"
+ INSUFFICIENT_POWER = "ORG.MATRIX.MSC3848.INSUFFICIENT_POWER"
+
# The account has been suspended on the server.
# By opposition to `USER_DEACTIVATED`, this is a reversible measure
# that can possibly be appealed and reverted.
@@ -167,7 +174,7 @@ class SynapseError(CodeMessageException):
else:
self._additional_fields = dict(additional_fields)
- def error_dict(self) -> "JsonDict":
+ def error_dict(self, config: Optional["HomeServerConfig"]) -> "JsonDict":
return cs_error(self.msg, self.errcode, **self._additional_fields)
@@ -213,7 +220,7 @@ class ConsentNotGivenError(SynapseError):
)
self._consent_uri = consent_uri
- def error_dict(self) -> "JsonDict":
+ def error_dict(self, config: Optional["HomeServerConfig"]) -> "JsonDict":
return cs_error(self.msg, self.errcode, consent_uri=self._consent_uri)
@@ -307,6 +314,37 @@ class AuthError(SynapseError):
super().__init__(code, msg, errcode, additional_fields)
+class UnstableSpecAuthError(AuthError):
+ """An error raised when a new error code is being proposed to replace a previous one.
+ This error will return a "org.matrix.unstable.errcode" property with the new error code,
+ with the previous error code still being defined in the "errcode" property.
+
+ This error will include `org.matrix.msc3848.unstable.errcode` in the C-S error body.
+ """
+
+ def __init__(
+ self,
+ code: int,
+ msg: str,
+ errcode: str,
+ previous_errcode: str = Codes.FORBIDDEN,
+ additional_fields: Optional[dict] = None,
+ ):
+ self.previous_errcode = previous_errcode
+ super().__init__(code, msg, errcode, additional_fields)
+
+ def error_dict(self, config: Optional["HomeServerConfig"]) -> "JsonDict":
+ fields = {}
+ if config is not None and config.experimental.msc3848_enabled:
+ fields["org.matrix.msc3848.unstable.errcode"] = self.errcode
+ return cs_error(
+ self.msg,
+ self.previous_errcode,
+ **fields,
+ **self._additional_fields,
+ )
+
+
class InvalidClientCredentialsError(SynapseError):
"""An error raised when there was a problem with the authorisation credentials
in a client request.
@@ -338,8 +376,8 @@ class InvalidClientTokenError(InvalidClientCredentialsError):
super().__init__(msg=msg, errcode="M_UNKNOWN_TOKEN")
self._soft_logout = soft_logout
- def error_dict(self) -> "JsonDict":
- d = super().error_dict()
+ def error_dict(self, config: Optional["HomeServerConfig"]) -> "JsonDict":
+ d = super().error_dict(config)
d["soft_logout"] = self._soft_logout
return d
@@ -362,7 +400,7 @@ class ResourceLimitError(SynapseError):
self.limit_type = limit_type
super().__init__(code, msg, errcode=errcode)
- def error_dict(self) -> "JsonDict":
+ def error_dict(self, config: Optional["HomeServerConfig"]) -> "JsonDict":
return cs_error(
self.msg,
self.errcode,
@@ -397,7 +435,7 @@ class InvalidCaptchaError(SynapseError):
super().__init__(code, msg, errcode)
self.error_url = error_url
- def error_dict(self) -> "JsonDict":
+ def error_dict(self, config: Optional["HomeServerConfig"]) -> "JsonDict":
return cs_error(self.msg, self.errcode, error_url=self.error_url)
@@ -414,7 +452,7 @@ class LimitExceededError(SynapseError):
super().__init__(code, msg, errcode)
self.retry_after_ms = retry_after_ms
- def error_dict(self) -> "JsonDict":
+ def error_dict(self, config: Optional["HomeServerConfig"]) -> "JsonDict":
return cs_error(self.msg, self.errcode, retry_after_ms=self.retry_after_ms)
@@ -429,7 +467,7 @@ class RoomKeysVersionError(SynapseError):
super().__init__(403, "Wrong room_keys version", Codes.WRONG_ROOM_KEYS_VERSION)
self.current_version = current_version
- def error_dict(self) -> "JsonDict":
+ def error_dict(self, config: Optional["HomeServerConfig"]) -> "JsonDict":
return cs_error(self.msg, self.errcode, current_version=self.current_version)
@@ -469,7 +507,7 @@ class IncompatibleRoomVersionError(SynapseError):
self._room_version = room_version
- def error_dict(self) -> "JsonDict":
+ def error_dict(self, config: Optional["HomeServerConfig"]) -> "JsonDict":
return cs_error(self.msg, self.errcode, room_version=self._room_version)
@@ -515,7 +553,7 @@ class UnredactedContentDeletedError(SynapseError):
)
self.content_keep_ms = content_keep_ms
- def error_dict(self) -> "JsonDict":
+ def error_dict(self, config: Optional["HomeServerConfig"]) -> "JsonDict":
extra = {}
if self.content_keep_ms is not None:
extra = {"fi.mau.msc2815.content_keep_ms": self.content_keep_ms}
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index ee443cea00..c2ecd977cd 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -88,5 +88,5 @@ class ExperimentalConfig(Config):
# MSC3715: dir param on /relations.
self.msc3715_enabled: bool = experimental.get("msc3715_enabled", False)
- # MSC3827: Filtering of /publicRooms by room type
- self.msc3827_enabled: bool = experimental.get("msc3827_enabled", False)
+ # MSC3848: Introduce errcodes for specific event sending failures
+ self.msc3848_enabled: bool = experimental.get("msc3848_enabled", False)
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index 965cb265da..389b0c5d53 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -30,7 +30,13 @@ from synapse.api.constants import (
JoinRules,
Membership,
)
-from synapse.api.errors import AuthError, EventSizeError, SynapseError
+from synapse.api.errors import (
+ AuthError,
+ Codes,
+ EventSizeError,
+ SynapseError,
+ UnstableSpecAuthError,
+)
from synapse.api.room_versions import (
KNOWN_ROOM_VERSIONS,
EventFormatVersions,
@@ -291,7 +297,11 @@ def check_state_dependent_auth_rules(
invite_level = get_named_level(auth_dict, "invite", 0)
if user_level < invite_level:
- raise AuthError(403, "You don't have permission to invite users")
+ raise UnstableSpecAuthError(
+ 403,
+ "You don't have permission to invite users",
+ errcode=Codes.INSUFFICIENT_POWER,
+ )
else:
logger.debug("Allowing! %s", event)
return
@@ -474,7 +484,11 @@ def _is_membership_change_allowed(
return
if not caller_in_room: # caller isn't joined
- raise AuthError(403, "%s not in room %s." % (event.user_id, event.room_id))
+ raise UnstableSpecAuthError(
+ 403,
+ "%s not in room %s." % (event.user_id, event.room_id),
+ errcode=Codes.NOT_JOINED,
+ )
if Membership.INVITE == membership:
# TODO (erikj): We should probably handle this more intelligently
@@ -484,10 +498,18 @@ def _is_membership_change_allowed(
if target_banned:
raise AuthError(403, "%s is banned from the room" % (target_user_id,))
elif target_in_room: # the target is already in the room.
- raise AuthError(403, "%s is already in the room." % target_user_id)
+ raise UnstableSpecAuthError(
+ 403,
+ "%s is already in the room." % target_user_id,
+ errcode=Codes.ALREADY_JOINED,
+ )
else:
if user_level < invite_level:
- raise AuthError(403, "You don't have permission to invite users")
+ raise UnstableSpecAuthError(
+ 403,
+ "You don't have permission to invite users",
+ errcode=Codes.INSUFFICIENT_POWER,
+ )
elif Membership.JOIN == membership:
# Joins are valid iff caller == target and:
# * They are not banned.
@@ -549,15 +571,27 @@ def _is_membership_change_allowed(
elif Membership.LEAVE == membership:
# TODO (erikj): Implement kicks.
if target_banned and user_level < ban_level:
- raise AuthError(403, "You cannot unban user %s." % (target_user_id,))
+ raise UnstableSpecAuthError(
+ 403,
+ "You cannot unban user %s." % (target_user_id,),
+ errcode=Codes.INSUFFICIENT_POWER,
+ )
elif target_user_id != event.user_id:
kick_level = get_named_level(auth_events, "kick", 50)
if user_level < kick_level or user_level <= target_level:
- raise AuthError(403, "You cannot kick user %s." % target_user_id)
+ raise UnstableSpecAuthError(
+ 403,
+ "You cannot kick user %s." % target_user_id,
+ errcode=Codes.INSUFFICIENT_POWER,
+ )
elif Membership.BAN == membership:
if user_level < ban_level or user_level <= target_level:
- raise AuthError(403, "You don't have permission to ban")
+ raise UnstableSpecAuthError(
+ 403,
+ "You don't have permission to ban",
+ errcode=Codes.INSUFFICIENT_POWER,
+ )
elif room_version.msc2403_knocking and Membership.KNOCK == membership:
if join_rule != JoinRules.KNOCK and (
not room_version.msc3787_knock_restricted_join_rule
@@ -567,7 +601,11 @@ def _is_membership_change_allowed(
elif target_user_id != event.user_id:
raise AuthError(403, "You cannot knock for other users")
elif target_in_room:
- raise AuthError(403, "You cannot knock on a room you are already in")
+ raise UnstableSpecAuthError(
+ 403,
+ "You cannot knock on a room you are already in",
+ errcode=Codes.ALREADY_JOINED,
+ )
elif caller_invited:
raise AuthError(403, "You are already invited to this room")
elif target_banned:
@@ -638,10 +676,11 @@ def _can_send_event(event: "EventBase", auth_events: StateMap["EventBase"]) -> b
user_level = get_user_power_level(event.user_id, auth_events)
if user_level < send_level:
- raise AuthError(
+ raise UnstableSpecAuthError(
403,
"You don't have permission to post that to the room. "
+ "user_level (%d) < send_level (%d)" % (user_level, send_level),
+ errcode=Codes.INSUFFICIENT_POWER,
)
# Check state_key
@@ -716,9 +755,10 @@ def check_historical(
historical_level = get_named_level(auth_events, "historical", 100)
if user_level < historical_level:
- raise AuthError(
+ raise UnstableSpecAuthError(
403,
'You don\'t have permission to send send historical related events ("insertion", "batch", and "marker")',
+ errcode=Codes.INSUFFICIENT_POWER,
)
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 842f5327c2..6a8d76529b 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -403,9 +403,9 @@ class FederationClient(FederationBase):
# Prime the cache
self._get_pdu_cache[event.event_id] = event
- # FIXME: We should add a `break` here to avoid calling every
- # destination after we already found a PDU (will follow-up
- # in a separate PR)
+ # Now that we have an event, we can break out of this
+ # loop and stop asking other destinations.
+ break
except SynapseError as e:
logger.info(
@@ -725,6 +725,12 @@ class FederationClient(FederationBase):
if failover_errcodes is None:
failover_errcodes = ()
+ if not destinations:
+ # Give a bit of a clearer message if no servers were specified at all.
+ raise SynapseError(
+ 502, f"Failed to {description} via any server: No servers specified."
+ )
+
for destination in destinations:
if destination == self.server_name:
continue
@@ -774,7 +780,7 @@ class FederationClient(FederationBase):
"Failed to %s via %s", description, destination, exc_info=True
)
- raise SynapseError(502, "Failed to %s via any server" % (description,))
+ raise SynapseError(502, f"Failed to {description} via any server")
async def make_membership_event(
self,
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index ae550d3f4d..1d60137411 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -469,7 +469,7 @@ class FederationServer(FederationBase):
)
for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id
- pdu_results[event_id] = e.error_dict()
+ pdu_results[event_id] = e.error_dict(self.hs.config)
return
for pdu in pdus_by_room[room_id]:
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 3d83236b0c..bfa5535044 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -565,7 +565,7 @@ class AuthHandler:
except LoginError as e:
# this step failed. Merge the error dict into the response
# so that the client can have another go.
- errordict = e.error_dict()
+ errordict = e.error_dict(self.hs.config)
creds = await self.store.get_completed_ui_auth_stages(session.session_id)
for f in flows:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 3b5eaf5156..1cf6cb32e3 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -546,9 +546,9 @@ class FederationHandler:
)
if ret.partial_state:
- # TODO(faster_joins): roll this back if we don't manage to start the
- # background resync (eg process_remote_join fails)
- # https://github.com/matrix-org/synapse/issues/12998
+ # Mark the room as having partial state.
+ # The background process is responsible for unmarking this flag,
+ # even if the join fails.
await self.store.store_partial_state_room(room_id, ret.servers_in_room)
try:
@@ -574,17 +574,21 @@ class FederationHandler:
room_id,
)
raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
-
- if ret.partial_state:
- # Kick off the process of asynchronously fetching the state for this
- # room.
- run_as_background_process(
- desc="sync_partial_state_room",
- func=self._sync_partial_state_room,
- initial_destination=origin,
- other_destinations=ret.servers_in_room,
- room_id=room_id,
- )
+ finally:
+ # Always kick off the background process that asynchronously fetches
+ # state for the room.
+ # If the join failed, the background process is responsible for
+ # cleaning up — including unmarking the room as a partial state room.
+ if ret.partial_state:
+ # Kick off the process of asynchronously fetching the state for this
+ # room.
+ run_as_background_process(
+ desc="sync_partial_state_room",
+ func=self._sync_partial_state_room,
+ initial_destination=origin,
+ other_destinations=ret.servers_in_room,
+ room_id=room_id,
+ )
# We wait here until this instance has seen the events come down
# replication (if we're using replication) as the below uses caches.
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 16f20c8be7..91d1439191 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -278,7 +278,8 @@ class FederationEventHandler:
)
try:
- await self._process_received_pdu(origin, pdu, state_ids=None)
+ context = await self._state_handler.compute_event_context(pdu)
+ await self._process_received_pdu(origin, pdu, context)
except PartialStateConflictError:
# The room was un-partial stated while we were processing the PDU.
# Try once more, with full state this time.
@@ -286,7 +287,8 @@ class FederationEventHandler:
"Room %s was un-partial stated while processing the PDU, trying again.",
room_id,
)
- await self._process_received_pdu(origin, pdu, state_ids=None)
+ context = await self._state_handler.compute_event_context(pdu)
+ await self._process_received_pdu(origin, pdu, context)
async def on_send_membership_event(
self, origin: str, event: EventBase
@@ -316,6 +318,7 @@ class FederationEventHandler:
The event and context of the event after inserting it into the room graph.
Raises:
+ RuntimeError if any prev_events are missing
SynapseError if the event is not accepted into the room
PartialStateConflictError if the room was un-partial stated in between
computing the state at the event and persisting it. The caller should
@@ -376,7 +379,7 @@ class FederationEventHandler:
# need to.
await self._event_creation_handler.cache_joined_hosts_for_event(event, context)
- await self._check_for_soft_fail(event, None, origin=origin)
+ await self._check_for_soft_fail(event, context=context, origin=origin)
await self._run_push_actions_and_persist_event(event, context)
return event, context
@@ -534,27 +537,30 @@ class FederationEventHandler:
#
# This is the same operation as we do when we receive a regular event
# over federation.
- state_ids = await self._resolve_state_at_missing_prevs(destination, event)
-
- # build a new state group for it if need be
- context = await self._state_handler.compute_event_context(
- event,
- state_ids_before_event=state_ids,
+ context = await self._compute_event_context_with_maybe_missing_prevs(
+ destination, event
)
if context.partial_state:
# this can happen if some or all of the event's prev_events still have
- # partial state - ie, an event has an earlier stream_ordering than one
- # or more of its prev_events, so we de-partial-state it before its
- # prev_events.
+ # partial state. We were careful to only pick events from the db without
+ # partial-state prev events, so that implies that a prev event has
+ # been persisted (with partial state) since we did the query.
#
- # TODO(faster_joins): we probably need to be more intelligent, and
- # exclude partial-state prev_events from consideration
- # https://github.com/matrix-org/synapse/issues/13001
+ # So, let's just ignore `event` for now; when we re-run the db query
+ # we should instead get its partial-state prev event, which we will
+ # de-partial-state, and then come back to event.
logger.warning(
- "%s still has partial state: can't de-partial-state it yet",
+ "%s still has prev_events with partial state: can't de-partial-state it yet",
event.event_id,
)
return
+
+ # since the state at this event has changed, we should now re-evaluate
+ # whether it should have been rejected. We must already have all of the
+ # auth events (from last time we went round this path), so there is no
+ # need to pass the origin.
+ await self._check_event_auth(None, event, context)
+
await self._store.update_state_for_partial_state_event(event, context)
self._state_storage_controller.notify_event_un_partial_stated(
event.event_id
@@ -806,29 +812,55 @@ class FederationEventHandler:
return
try:
- state_ids = await self._resolve_state_at_missing_prevs(origin, event)
- # TODO(faster_joins): make sure that _resolve_state_at_missing_prevs does
- # not return partial state
- # https://github.com/matrix-org/synapse/issues/13002
+ try:
+ context = await self._compute_event_context_with_maybe_missing_prevs(
+ origin, event
+ )
+ await self._process_received_pdu(
+ origin,
+ event,
+ context,
+ backfilled=backfilled,
+ )
+ except PartialStateConflictError:
+ # The room was un-partial stated while we were processing the event.
+ # Try once more, with full state this time.
+ context = await self._compute_event_context_with_maybe_missing_prevs(
+ origin, event
+ )
- await self._process_received_pdu(
- origin, event, state_ids=state_ids, backfilled=backfilled
- )
+ # We ought to have full state now, barring some unlikely race where we left and
+ # rejoned the room in the background.
+ if context.partial_state:
+ raise AssertionError(
+ f"Event {event.event_id} still has a partial resolved state "
+ f"after room {event.room_id} was un-partial stated"
+ )
+
+ await self._process_received_pdu(
+ origin,
+ event,
+ context,
+ backfilled=backfilled,
+ )
except FederationError as e:
if e.code == 403:
logger.warning("Pulled event %s failed history check.", event_id)
else:
raise
- async def _resolve_state_at_missing_prevs(
+ async def _compute_event_context_with_maybe_missing_prevs(
self, dest: str, event: EventBase
- ) -> Optional[StateMap[str]]:
- """Calculate the state at an event with missing prev_events.
+ ) -> EventContext:
+ """Build an EventContext structure for a non-outlier event whose prev_events may
+ be missing.
- This is used when we have pulled a batch of events from a remote server, and
- still don't have all the prev_events.
+ This is used when we have pulled a batch of events from a remote server, and may
+ not have all the prev_events.
- If we already have all the prev_events for `event`, this method does nothing.
+ To build an EventContext, we need to calculate the state before the event. If we
+ already have all the prev_events for `event`, we can simply use the state after
+ the prev_events to calculate the state before `event`.
Otherwise, the missing prevs become new backwards extremities, and we fall back
to asking the remote server for the state after each missing `prev_event`,
@@ -849,8 +881,7 @@ class FederationEventHandler:
event: an event to check for missing prevs.
Returns:
- if we already had all the prev events, `None`. Otherwise, returns
- the event ids of the state at `event`.
+ The event context.
Raises:
FederationError if we fail to get the state from the remote server after any
@@ -864,7 +895,7 @@ class FederationEventHandler:
missing_prevs = prevs - seen
if not missing_prevs:
- return None
+ return await self._state_handler.compute_event_context(event)
logger.info(
"Event %s is missing prev_events %s: calculating state for a "
@@ -876,9 +907,15 @@ class FederationEventHandler:
# resolve them to find the correct state at the current event.
try:
+ # Determine whether we may be about to retrieve partial state
+ # Events may be un-partial stated right after we compute the partial state
+ # flag, but that's okay, as long as the flag errs on the conservative side.
+ partial_state_flags = await self._store.get_partial_state_events(seen)
+ partial_state = any(partial_state_flags.values())
+
# Get the state of the events we know about
ours = await self._state_storage_controller.get_state_groups_ids(
- room_id, seen
+ room_id, seen, await_full_state=False
)
# state_maps is a list of mappings from (type, state_key) to event_id
@@ -924,7 +961,9 @@ class FederationEventHandler:
"We can't get valid state history.",
affected=event_id,
)
- return state_map
+ return await self._state_handler.compute_event_context(
+ event, state_ids_before_event=state_map, partial_state=partial_state
+ )
async def _get_state_ids_after_missing_prev_event(
self,
@@ -1093,7 +1132,7 @@ class FederationEventHandler:
self,
origin: str,
event: EventBase,
- state_ids: Optional[StateMap[str]],
+ context: EventContext,
backfilled: bool = False,
) -> None:
"""Called when we have a new non-outlier event.
@@ -1115,24 +1154,18 @@ class FederationEventHandler:
event: event to be persisted
- state_ids: Normally None, but if we are handling a gap in the graph
- (ie, we are missing one or more prev_events), the resolved state at the
- event. Must not be partial state.
+ context: The `EventContext` to persist the event with.
backfilled: True if this is part of a historical batch of events (inhibits
notification to clients, and validation of device keys.)
PartialStateConflictError: if the room was un-partial stated in between
- computing the state at the event and persisting it. The caller should retry
- exactly once in this case. Will never be raised if `state_ids` is provided.
+ computing the state at the event and persisting it. The caller should
+ recompute `context` and retry exactly once when this happens.
"""
logger.debug("Processing event: %s", event)
assert not event.internal_metadata.outlier
- context = await self._state_handler.compute_event_context(
- event,
- state_ids_before_event=state_ids,
- )
try:
await self._check_event_auth(origin, event, context)
except AuthError as e:
@@ -1144,7 +1177,7 @@ class FederationEventHandler:
# For new (non-backfilled and non-outlier) events we check if the event
# passes auth based on the current state. If it doesn't then we
# "soft-fail" the event.
- await self._check_for_soft_fail(event, state_ids, origin=origin)
+ await self._check_for_soft_fail(event, context=context, origin=origin)
await self._run_push_actions_and_persist_event(event, context, backfilled)
@@ -1556,13 +1589,15 @@ class FederationEventHandler:
)
async def _check_event_auth(
- self, origin: str, event: EventBase, context: EventContext
+ self, origin: Optional[str], event: EventBase, context: EventContext
) -> None:
"""
Checks whether an event should be rejected (for failing auth checks).
Args:
- origin: The host the event originates from.
+ origin: The host the event originates from. This is used to fetch
+ any missing auth events. It can be set to None, but only if we are
+ sure that we already have all the auth events.
event: The event itself.
context:
The event context.
@@ -1705,7 +1740,7 @@ class FederationEventHandler:
async def _check_for_soft_fail(
self,
event: EventBase,
- state_ids: Optional[StateMap[str]],
+ context: EventContext,
origin: str,
) -> None:
"""Checks if we should soft fail the event; if so, marks the event as
@@ -1716,7 +1751,7 @@ class FederationEventHandler:
Args:
event
- state_ids: The state at the event if we don't have all the event's prev events
+ context: The `EventContext` which we are about to persist the event with.
origin: The host the event originates from.
"""
if await self._store.is_partial_state_room(event.room_id):
@@ -1742,11 +1777,15 @@ class FederationEventHandler:
auth_types = auth_types_for_event(room_version_obj, event)
# Calculate the "current state".
- if state_ids is not None:
- # If we're explicitly given the state then we won't have all the
- # prev events, and so we have a gap in the graph. In this case
- # we want to be a little careful as we might have been down for
- # a while and have an incorrect view of the current state,
+ seen_event_ids = await self._store.have_events_in_timeline(prev_event_ids)
+ has_missing_prevs = bool(prev_event_ids - seen_event_ids)
+ if has_missing_prevs:
+ # We don't have all the prev_events of this event, which means we have a
+ # gap in the graph, and the new event is going to become a new backwards
+ # extremity.
+ #
+ # In this case we want to be a little careful as we might have been
+ # down for a while and have an incorrect view of the current state,
# however we still want to do checks as gaps are easy to
# maliciously manufacture.
#
@@ -1759,6 +1798,7 @@ class FederationEventHandler:
event.room_id, extrem_ids
)
state_sets: List[StateMap[str]] = list(state_sets_d.values())
+ state_ids = await context.get_prev_state_ids()
state_sets.append(state_ids)
current_state_ids = (
await self._state_resolution_handler.resolve_events_with_store(
@@ -1808,7 +1848,7 @@ class FederationEventHandler:
event.internal_metadata.soft_failed = True
async def _load_or_fetch_auth_events_for_event(
- self, destination: str, event: EventBase
+ self, destination: Optional[str], event: EventBase
) -> Collection[EventBase]:
"""Fetch this event's auth_events, from database or remote
@@ -1824,12 +1864,19 @@ class FederationEventHandler:
Args:
destination: where to send the /event_auth request. Typically the server
that sent us `event` in the first place.
+
+ If this is None, no attempt is made to load any missing auth events:
+ rather, an AssertionError is raised if there are any missing events.
+
event: the event whose auth_events we want
Returns:
all of the events listed in `event.auth_events_ids`, after deduplication
Raises:
+ AssertionError if some auth events were missing and no `destination` was
+ supplied.
+
AuthError if we were unable to fetch the auth_events for any reason.
"""
event_auth_event_ids = set(event.auth_event_ids())
@@ -1841,6 +1888,13 @@ class FederationEventHandler:
)
if not missing_auth_event_ids:
return event_auth_events.values()
+ if destination is None:
+ # this shouldn't happen: destination must be set unless we know we have already
+ # persisted the auth events.
+ raise AssertionError(
+ "_load_or_fetch_auth_events_for_event() called with no destination for "
+ "an event with missing auth_events"
+ )
logger.info(
"Event %s refers to unknown auth events %s: fetching auth chain",
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index bd7baef051..e85b540451 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -41,6 +41,7 @@ from synapse.api.errors import (
NotFoundError,
ShadowBanError,
SynapseError,
+ UnstableSpecAuthError,
UnsupportedRoomVersionError,
)
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
@@ -149,7 +150,11 @@ class MessageHandler:
"Attempted to retrieve data from a room for a user that has never been in it. "
"This should not have happened."
)
- raise SynapseError(403, "User not in room", errcode=Codes.FORBIDDEN)
+ raise UnstableSpecAuthError(
+ 403,
+ "User not in room",
+ errcode=Codes.NOT_JOINED,
+ )
return data
@@ -334,7 +339,11 @@ class MessageHandler:
break
else:
# Loop fell through, AS has no interested users in room
- raise AuthError(403, "Appservice not in room")
+ raise UnstableSpecAuthError(
+ 403,
+ "Appservice not in room",
+ errcode=Codes.NOT_JOINED,
+ )
return {
user_id: {
@@ -1135,6 +1144,10 @@ class EventCreationHandler:
context = await self.state.compute_event_context(
event,
state_ids_before_event=state_map_for_event,
+ # TODO(faster_joins): check how MSC2716 works and whether we can have
+ # partial state here
+ # https://github.com/matrix-org/synapse/issues/13003
+ partial_state=False,
)
else:
context = await self.state.compute_event_context(event)
diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py
index 0b63cd2186..8f797e3ae9 100644
--- a/synapse/handlers/relations.py
+++ b/synapse/handlers/relations.py
@@ -73,7 +73,6 @@ class RelationsHandler:
room_id: str,
relation_type: Optional[str] = None,
event_type: Optional[str] = None,
- aggregation_key: Optional[str] = None,
limit: int = 5,
direction: str = "b",
from_token: Optional[StreamToken] = None,
@@ -89,7 +88,6 @@ class RelationsHandler:
room_id: The room the event belongs to.
relation_type: Only fetch events with this relation type, if given.
event_type: Only fetch events with this event type, if given.
- aggregation_key: Only fetch events with this aggregation key, if given.
limit: Only fetch the most recent `limit` events.
direction: Whether to fetch the most recent first (`"b"`) or the
oldest first (`"f"`).
@@ -122,7 +120,6 @@ class RelationsHandler:
room_id=room_id,
relation_type=relation_type,
event_type=event_type,
- aggregation_key=aggregation_key,
limit=limit,
direction=direction,
from_token=from_token,
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 29868eb743..bb0bdb8e6f 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -182,7 +182,7 @@ class RoomListHandler:
== HistoryVisibility.WORLD_READABLE,
"guest_can_join": room["guest_access"] == "can_join",
"join_rule": room["join_rules"],
- "org.matrix.msc3827.room_type": room["room_type"],
+ "room_type": room["room_type"],
}
# Filter out Nones – rather omit the field altogether
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 30b4cb23df..520c52e013 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -1679,7 +1679,11 @@ class RoomMemberMasterHandler(RoomMemberHandler):
]
if len(remote_room_hosts) == 0:
- raise SynapseError(404, "No known servers")
+ raise SynapseError(
+ 404,
+ "Can't join remote room because no servers "
+ "that are in the room have been provided.",
+ )
check_complexity = self.hs.config.server.limit_remote_rooms.enabled
if (
diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py
index 13098f56ed..ebd445adca 100644
--- a/synapse/handlers/room_summary.py
+++ b/synapse/handlers/room_summary.py
@@ -28,11 +28,11 @@ from synapse.api.constants import (
RoomTypes,
)
from synapse.api.errors import (
- AuthError,
Codes,
NotFoundError,
StoreError,
SynapseError,
+ UnstableSpecAuthError,
UnsupportedRoomVersionError,
)
from synapse.api.ratelimiting import Ratelimiter
@@ -175,10 +175,11 @@ class RoomSummaryHandler:
# First of all, check that the room is accessible.
if not await self._is_local_room_accessible(requested_room_id, requester):
- raise AuthError(
+ raise UnstableSpecAuthError(
403,
"User %s not in room %s, and room previews are disabled"
% (requester, requested_room_id),
+ errcode=Codes.NOT_JOINED,
)
# If this is continuing a previous session, pull the persisted data.
@@ -452,7 +453,6 @@ class RoomSummaryHandler:
"type": e.type,
"state_key": e.state_key,
"content": e.content,
- "room_id": e.room_id,
"sender": e.sender,
"origin_server_ts": e.origin_server_ts,
}
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index d104ea07fe..27aa0d3126 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -489,8 +489,15 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]):
handler = self.get_typing_handler()
events = []
- for room_id in handler._room_serials.keys():
- if handler._room_serials[room_id] <= from_key:
+
+ # Work on a copy of things here as these may change in the handler while
+ # waiting for the AS `is_interested_in_room` call to complete.
+ # Shallow copy is safe as no nested data is present.
+ latest_room_serial = handler._latest_room_serial
+ room_serials = handler._room_serials.copy()
+
+ for room_id, serial in room_serials.items():
+ if serial <= from_key:
continue
if not await service.is_interested_in_room(room_id, self._main_store):
@@ -498,7 +505,7 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]):
events.append(self._make_event_for(room_id))
- return events, handler._latest_room_serial
+ return events, latest_room_serial
async def get_new_events(
self,
diff --git a/synapse/http/server.py b/synapse/http/server.py
index cf2d6f904b..19f42159b8 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -58,6 +58,7 @@ from synapse.api.errors import (
SynapseError,
UnrecognizedRequestError,
)
+from synapse.config.homeserver import HomeServerConfig
from synapse.http.site import SynapseRequest
from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background
from synapse.logging.opentracing import active_span, start_active_span, trace_servlet
@@ -155,15 +156,16 @@ def is_method_cancellable(method: Callable[..., Any]) -> bool:
return getattr(method, "cancellable", False)
-def return_json_error(f: failure.Failure, request: SynapseRequest) -> None:
+def return_json_error(
+ f: failure.Failure, request: SynapseRequest, config: Optional[HomeServerConfig]
+) -> None:
"""Sends a JSON error response to clients."""
if f.check(SynapseError):
# mypy doesn't understand that f.check asserts the type.
exc: SynapseError = f.value # type: ignore
error_code = exc.code
- error_dict = exc.error_dict()
-
+ error_dict = exc.error_dict(config)
logger.info("%s SynapseError: %s - %s", request, error_code, exc.msg)
elif f.check(CancelledError):
error_code = HTTP_STATUS_REQUEST_CANCELLED
@@ -450,7 +452,7 @@ class DirectServeJsonResource(_AsyncResource):
request: SynapseRequest,
) -> None:
"""Implements _AsyncResource._send_error_response"""
- return_json_error(f, request)
+ return_json_error(f, request, None)
@attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -575,6 +577,14 @@ class JsonResource(DirectServeJsonResource):
return callback_return
+ def _send_error_response(
+ self,
+ f: failure.Failure,
+ request: SynapseRequest,
+ ) -> None:
+ """Implements _AsyncResource._send_error_response"""
+ return_json_error(f, request, self.hs.config)
+
class DirectServeHtmlResource(_AsyncResource):
"""A resource that will call `self._async_on_<METHOD>` on new requests,
diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py
index f4f06563dd..0366986755 100644
--- a/synapse/rest/client/versions.py
+++ b/synapse/rest/client/versions.py
@@ -95,8 +95,8 @@ class VersionsRestServlet(RestServlet):
"org.matrix.msc3026.busy_presence": self.config.experimental.msc3026_enabled,
# Supports receiving private read receipts as per MSC2285
"org.matrix.msc2285": self.config.experimental.msc2285_enabled,
- # Supports filtering of /publicRooms by room type MSC3827
- "org.matrix.msc3827": self.config.experimental.msc3827_enabled,
+ # Supports filtering of /publicRooms by room type as per MSC3827
+ "org.matrix.msc3827.stable": True,
# Adds support for importing historical messages as per MSC2716
"org.matrix.msc2716": self.config.experimental.msc2716_enabled,
# Adds support for jump to date endpoints (/timestamp_to_event) as per MSC3030
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 87ccd52f0a..c355e4f98a 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -255,7 +255,7 @@ class StateHandler:
self,
event: EventBase,
state_ids_before_event: Optional[StateMap[str]] = None,
- partial_state: bool = False,
+ partial_state: Optional[bool] = None,
) -> EventContext:
"""Build an EventContext structure for a non-outlier event.
@@ -270,10 +270,18 @@ class StateHandler:
it can't be calculated from existing events. This is normally
only specified when receiving an event from federation where we
don't have the prev events, e.g. when backfilling.
- partial_state: True if `state_ids_before_event` is partial and omits
- non-critical membership events
+ partial_state:
+ `True` if `state_ids_before_event` is partial and omits non-critical
+ membership events.
+ `False` if `state_ids_before_event` is the full state.
+ `None` when `state_ids_before_event` is not provided. In this case, the
+ flag will be calculated based on `event`'s prev events.
Returns:
The event context.
+
+ Raises:
+ RuntimeError if `state_ids_before_event` is not provided and one or more
+ prev events are missing or outliers.
"""
assert not event.internal_metadata.is_outlier()
@@ -298,12 +306,14 @@ class StateHandler:
)
)
+ # the partial_state flag must be provided
+ assert partial_state is not None
else:
# otherwise, we'll need to resolve the state across the prev_events.
# partial_state should not be set explicitly in this case:
# we work it out dynamically
- assert not partial_state
+ assert partial_state is None
# if any of the prev-events have partial state, so do we.
# (This is slightly racy - the prev-events might get fixed up before we use
@@ -313,13 +323,13 @@ class StateHandler:
incomplete_prev_events = await self.store.get_partial_state_events(
prev_event_ids
)
- if any(incomplete_prev_events.values()):
+ partial_state = any(incomplete_prev_events.values())
+ if partial_state:
logger.debug(
"New/incoming event %s refers to prev_events %s with partial state",
event.event_id,
[k for (k, v) in incomplete_prev_events.items() if v],
)
- partial_state = True
logger.debug("calling resolve_state_groups from compute_event_context")
# we've already taken into account partial state, so no need to wait for
@@ -426,6 +436,10 @@ class StateHandler:
Returns:
The resolved state
+
+ Raises:
+ RuntimeError if we don't have a state group for one or more of the events
+ (ie. they are outliers or unknown)
"""
logger.debug("resolve_state_groups event_ids %s", event_ids)
diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py
index e08f956e6e..1e35046e07 100644
--- a/synapse/storage/controllers/state.py
+++ b/synapse/storage/controllers/state.py
@@ -82,13 +82,15 @@ class StateStorageController:
return state_group_delta.prev_group, state_group_delta.delta_ids
async def get_state_groups_ids(
- self, _room_id: str, event_ids: Collection[str]
+ self, _room_id: str, event_ids: Collection[str], await_full_state: bool = True
) -> Dict[int, MutableStateMap[str]]:
"""Get the event IDs of all the state for the state groups for the given events
Args:
_room_id: id of the room for these events
event_ids: ids of the events
+ await_full_state: if `True`, will block if we do not yet have complete
+ state at these events.
Returns:
dict of state_group_id -> (dict of (type, state_key) -> event id)
@@ -100,7 +102,9 @@ class StateStorageController:
if not event_ids:
return {}
- event_to_groups = await self.get_state_group_for_events(event_ids)
+ event_to_groups = await self.get_state_group_for_events(
+ event_ids, await_full_state=await_full_state
+ )
groups = set(event_to_groups.values())
group_to_state = await self.stores.state._get_state_for_groups(groups)
@@ -334,6 +338,10 @@ class StateStorageController:
event_ids: events to get state groups for
await_full_state: if true, will block if we do not yet have complete
state at these events.
+
+ Raises:
+ RuntimeError if we don't have a state group for one or more of the events
+ (ie. they are outliers or unknown)
"""
if await_full_state:
await self._partial_state_events_tracker.await_full_state(event_ids)
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 5914a35420..29c99c6357 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -2110,11 +2110,29 @@ class EventsWorkerStore(SQLBaseStore):
def _get_partial_state_events_batch_txn(
txn: LoggingTransaction, room_id: str
) -> List[str]:
+ # we want to work through the events from oldest to newest, so
+ # we only want events whose prev_events do *not* have partial state - hence
+ # the 'NOT EXISTS' clause in the below.
+ #
+ # This is necessary because ordering by stream ordering isn't quite enough
+ # to ensure that we work from oldest to newest event (in particular,
+ # if an event is initially persisted as an outlier and later de-outliered,
+ # it can end up with a lower stream_ordering than its prev_events).
+ #
+ # Typically this means we'll only return one event per batch, but that's
+ # hard to do much about.
+ #
+ # See also: https://github.com/matrix-org/synapse/issues/13001
txn.execute(
"""
SELECT event_id FROM partial_state_events AS pse
JOIN events USING (event_id)
- WHERE pse.room_id = ?
+ WHERE pse.room_id = ? AND
+ NOT EXISTS(
+ SELECT 1 FROM event_edges AS ee
+ JOIN partial_state_events AS prev_pse ON (prev_pse.event_id=ee.prev_event_id)
+ WHERE ee.event_id=pse.event_id
+ )
ORDER BY events.stream_ordering
LIMIT 100
""",
diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index b457bc189e..7bd27790eb 100644
--- a/synapse/storage/databases/main/relations.py
+++ b/synapse/storage/databases/main/relations.py
@@ -62,7 +62,6 @@ class RelationsWorkerStore(SQLBaseStore):
room_id: str,
relation_type: Optional[str] = None,
event_type: Optional[str] = None,
- aggregation_key: Optional[str] = None,
limit: int = 5,
direction: str = "b",
from_token: Optional[StreamToken] = None,
@@ -76,7 +75,6 @@ class RelationsWorkerStore(SQLBaseStore):
room_id: The room the event belongs to.
relation_type: Only fetch events with this relation type, if given.
event_type: Only fetch events with this event type, if given.
- aggregation_key: Only fetch events with this aggregation key, if given.
limit: Only fetch the most recent `limit` events.
direction: Whether to fetch the most recent first (`"b"`) or the
oldest first (`"f"`).
@@ -105,10 +103,6 @@ class RelationsWorkerStore(SQLBaseStore):
where_clause.append("type = ?")
where_args.append(event_type)
- if aggregation_key:
- where_clause.append("aggregation_key = ?")
- where_args.append(aggregation_key)
-
pagination_clause = generate_pagination_where_clause(
direction=direction,
column_names=("topological_ordering", "stream_ordering"),
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index d6d485507b..0f1f0d11ea 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -207,7 +207,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
def _construct_room_type_where_clause(
self, room_types: Union[List[Union[str, None]], None]
) -> Tuple[Union[str, None], List[str]]:
- if not room_types or not self.config.experimental.msc3827_enabled:
+ if not room_types:
return None, []
else:
# We use None when we want get rooms without a type
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index 9674c4a757..f70705a0af 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -419,13 +419,15 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
# anything that was rejected should have the same state as its
# predecessor.
if context.rejected:
- assert context.state_group == context.state_group_before_event
+ state_group = context.state_group_before_event
+ else:
+ state_group = context.state_group
self.db_pool.simple_update_txn(
txn,
table="event_to_state_groups",
keyvalues={"event_id": event.event_id},
- updatevalues={"state_group": context.state_group},
+ updatevalues={"state_group": state_group},
)
self.db_pool.simple_delete_one_txn(
@@ -440,7 +442,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
txn.call_after(
self._get_state_group_for_event.prefill,
(event.event_id,),
- context.state_group,
+ state_group,
)
|