diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index f455158a2c..72157d5a36 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -37,12 +37,14 @@ from synapse.api.errors import (
AuthError,
Codes,
ConsentNotGivenError,
+ LimitExceededError,
NotFoundError,
ShadowBanError,
SynapseError,
+ UnstableSpecAuthError,
UnsupportedRoomVersionError,
)
-from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.api.urls import ConsentURIBuilder
from synapse.event_auth import validate_event_for_room_version
from synapse.events import EventBase, relation_from_event
@@ -50,9 +52,11 @@ from synapse.events.builder import EventBuilder
from synapse.events.snapshot import EventContext
from synapse.events.validator import EventValidator
from synapse.handlers.directory import DirectoryHandler
+from synapse.logging import opentracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
+from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import (
@@ -100,7 +104,7 @@ class MessageHandler:
async def get_room_data(
self,
- user_id: str,
+ requester: Requester,
room_id: str,
event_type: str,
state_key: str,
@@ -108,7 +112,7 @@ class MessageHandler:
"""Get data from a room.
Args:
- user_id
+ requester: The user who did the request.
room_id
event_type
state_key
@@ -121,7 +125,7 @@ class MessageHandler:
membership,
membership_event_id,
) = await self.auth.check_user_in_room_or_world_readable(
- room_id, user_id, allow_departed_users=True
+ room_id, requester, allow_departed_users=True
)
if membership == Membership.JOIN:
@@ -147,17 +151,20 @@ class MessageHandler:
"Attempted to retrieve data from a room for a user that has never been in it. "
"This should not have happened."
)
- raise SynapseError(403, "User not in room", errcode=Codes.FORBIDDEN)
+ raise UnstableSpecAuthError(
+ 403,
+ "User not in room",
+ errcode=Codes.NOT_JOINED,
+ )
return data
async def get_state_events(
self,
- user_id: str,
+ requester: Requester,
room_id: str,
state_filter: Optional[StateFilter] = None,
at_token: Optional[StreamToken] = None,
- is_guest: bool = False,
) -> List[dict]:
"""Retrieve all state events for a given room. If the user is
joined to the room then return the current state. If the user has
@@ -166,14 +173,13 @@ class MessageHandler:
visible.
Args:
- user_id: The user requesting state events.
+ requester: The user requesting state events.
room_id: The room ID to get all state events from.
state_filter: The state filter used to fetch state from the database.
at_token: the stream token of the at which we are requesting
the stats. If the user is not allowed to view the state as of that
stream token, we raise a 403 SynapseError. If None, returns the current
state based on the current_state_events table.
- is_guest: whether this user is a guest
Returns:
A list of dicts representing state events. [{}, {}, {}]
Raises:
@@ -183,6 +189,7 @@ class MessageHandler:
members of this room.
"""
state_filter = state_filter or StateFilter.all()
+ user_id = requester.user.to_string()
if at_token:
last_event_id = (
@@ -215,7 +222,7 @@ class MessageHandler:
membership,
membership_event_id,
) = await self.auth.check_user_in_room_or_world_readable(
- room_id, user_id, allow_departed_users=True
+ room_id, requester, allow_departed_users=True
)
if membership == Membership.JOIN:
@@ -309,30 +316,42 @@ class MessageHandler:
Returns:
A dict of user_id to profile info
"""
- user_id = requester.user.to_string()
if not requester.app_service:
# We check AS auth after fetching the room membership, as it
# requires us to pull out all joined members anyway.
membership, _ = await self.auth.check_user_in_room_or_world_readable(
- room_id, user_id, allow_departed_users=True
+ room_id, requester, allow_departed_users=True
)
if membership != Membership.JOIN:
- raise NotImplementedError(
- "Getting joined members after leaving is not implemented"
+ raise SynapseError(
+ code=403,
+ errcode=Codes.FORBIDDEN,
+ msg="Getting joined members while not being a current member of the room is forbidden.",
)
- users_with_profile = await self.store.get_users_in_room_with_profiles(room_id)
+ users_with_profile = (
+ await self._state_storage_controller.get_users_in_room_with_profiles(
+ room_id
+ )
+ )
# If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or because there
# is a user in the room that the AS is "interested in"
- if requester.app_service and user_id not in users_with_profile:
+ if (
+ requester.app_service
+ and requester.user.to_string() not in users_with_profile
+ ):
for uid in users_with_profile:
if requester.app_service.is_interested_in_user(uid):
break
else:
# Loop fell through, AS has no interested users in room
- raise AuthError(403, "Appservice not in room")
+ raise UnstableSpecAuthError(
+ 403,
+ "Appservice not in room",
+ errcode=Codes.NOT_JOINED,
+ )
return {
user_id: {
@@ -444,7 +463,7 @@ _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY = 7 * 24 * 60 * 60 * 1000
class EventCreationHandler:
def __init__(self, hs: "HomeServer"):
self.hs = hs
- self.auth = hs.get_auth()
+ self.auth_blocking = hs.get_auth_blocking()
self._event_auth_handler = hs.get_event_auth_handler()
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
@@ -461,6 +480,7 @@ class EventCreationHandler:
)
self._events_shard_config = self.config.worker.events_shard_config
self._instance_name = hs.get_instance_name()
+ self._notifier = hs.get_notifier()
self.room_prejoin_state_types = self.hs.config.api.room_prejoin_state
@@ -605,7 +625,7 @@ class EventCreationHandler:
Returns:
Tuple of created event, Context
"""
- await self.auth.check_auth_blocking(requester=requester)
+ await self.auth_blocking.check_auth_blocking(requester=requester)
if event_dict["type"] == EventTypes.Create and event_dict["state_key"] == "":
room_version_id = event_dict["content"]["room_version"]
@@ -741,8 +761,10 @@ class EventCreationHandler:
async def _is_server_notices_room(self, room_id: str) -> bool:
if self.config.servernotices.server_notices_mxid is None:
return False
- user_ids = await self.store.get_users_in_room(room_id)
- return self.config.servernotices.server_notices_mxid in user_ids
+ is_server_notices_room = await self.store.check_local_user_in_room(
+ user_id=self.config.servernotices.server_notices_mxid, room_id=room_id
+ )
+ return is_server_notices_room
async def assert_accepted_privacy_policy(self, requester: Requester) -> None:
"""Check if a user has accepted the privacy policy
@@ -903,6 +925,9 @@ class EventCreationHandler:
await self.clock.sleep(random.randint(1, 10))
raise ShadowBanError()
+ if ratelimit:
+ await self.request_ratelimiter.ratelimit(requester, update=False)
+
# We limit the number of concurrent event sends in a room so that we
# don't fork the DAG too much. If we don't limit then we can end up in
# a situation where event persistence can't keep up, causing
@@ -954,14 +979,12 @@ class EventCreationHandler:
"Spam-check module returned invalid error value. Expecting [code, dict], got %s",
spam_check_result,
)
- spam_check_result = Codes.FORBIDDEN
- if isinstance(spam_check_result, Codes):
- raise SynapseError(
- 403,
- "This message has been rejected as probable spam",
- spam_check_result,
- )
+ raise SynapseError(
+ 403,
+ "This message has been rejected as probable spam",
+ Codes.FORBIDDEN,
+ )
# Backwards compatibility: if the return value is not an error code, it
# means the module returned an error message to be included in the
@@ -1102,6 +1125,7 @@ class EventCreationHandler:
#
# TODO(faster_joins): figure out how this works, and make sure that the
# old state is complete.
+ # https://github.com/matrix-org/synapse/issues/13003
metadata = await self.store.get_metadata_for_events(state_event_ids)
state_map_for_event: MutableStateMap[str] = {}
@@ -1130,6 +1154,10 @@ class EventCreationHandler:
context = await self.state.compute_event_context(
event,
state_ids_before_event=state_map_for_event,
+ # TODO(faster_joins): check how MSC2716 works and whether we can have
+ # partial state here
+ # https://github.com/matrix-org/synapse/issues/13003
+ partial_state=False,
)
else:
context = await self.state.compute_event_context(event)
@@ -1248,6 +1276,8 @@ class EventCreationHandler:
Raises:
ShadowBanError if the requester has been shadow-banned.
+ SynapseError(503) if attempting to persist a partial state event in
+ a room that has been un-partial stated.
"""
extra_users = extra_users or []
@@ -1273,23 +1303,6 @@ class EventCreationHandler:
)
return prev_event
- if event.is_state() and (event.type, event.state_key) == (
- EventTypes.Create,
- "",
- ):
- room_version_id = event.content.get(
- "room_version", RoomVersions.V1.identifier
- )
- maybe_room_version_obj = KNOWN_ROOM_VERSIONS.get(room_version_id)
- if not maybe_room_version_obj:
- raise UnsupportedRoomVersionError(
- "Attempt to create a room with unsupported room version %s"
- % (room_version_id,)
- )
- room_version_obj = maybe_room_version_obj
- else:
- room_version_obj = await self.store.get_room_version(event.room_id)
-
if event.internal_metadata.is_out_of_band_membership():
# the only sort of out-of-band-membership events we expect to see here are
# invite rejections and rescinded knocks that we have generated ourselves.
@@ -1297,9 +1310,9 @@ class EventCreationHandler:
assert event.content["membership"] == Membership.LEAVE
else:
try:
- validate_event_for_room_version(room_version_obj, event)
+ validate_event_for_room_version(event)
await self._event_auth_handler.check_auth_rules_from_context(
- room_version_obj, event, context
+ event, context
)
except AuthError as err:
logger.warning("Denying new event %r because %s", event, err)
@@ -1315,24 +1328,35 @@ class EventCreationHandler:
# We now persist the event (and update the cache in parallel, since we
# don't want to block on it).
- result, _ = await make_deferred_yieldable(
- gather_results(
- (
- run_in_background(
- self._persist_event,
- requester=requester,
- event=event,
- context=context,
- ratelimit=ratelimit,
- extra_users=extra_users,
+ try:
+ result, _ = await make_deferred_yieldable(
+ gather_results(
+ (
+ run_in_background(
+ self._persist_event,
+ requester=requester,
+ event=event,
+ context=context,
+ ratelimit=ratelimit,
+ extra_users=extra_users,
+ ),
+ run_in_background(
+ self.cache_joined_hosts_for_event, event, context
+ ).addErrback(
+ log_failure, "cache_joined_hosts_for_event failed"
+ ),
),
- run_in_background(
- self.cache_joined_hosts_for_event, event, context
- ).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
- ),
- consumeErrors=True,
+ consumeErrors=True,
+ )
+ ).addErrback(unwrapFirstError)
+ except PartialStateConflictError as e:
+ # The event context needs to be recomputed.
+ # Turn the error into a 429, as a hint to the client to try again.
+ logger.info(
+ "Room %s was un-partial stated while persisting client event.",
+ event.room_id,
)
- ).addErrback(unwrapFirstError)
+ raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
return result
@@ -1347,6 +1371,9 @@ class EventCreationHandler:
"""Actually persists the event. Should only be called by
`handle_new_client_event`, and see its docstring for documentation of
the arguments.
+
+ PartialStateConflictError: if attempting to persist a partial state event in
+ a room that has been un-partial stated.
"""
# Skip push notification actions for historical messages
@@ -1355,24 +1382,30 @@ class EventCreationHandler:
# and `state_groups` because they have `prev_events` that aren't persisted yet
# (historical messages persisted in reverse-chronological order).
if not event.internal_metadata.is_historical():
- await self._bulk_push_rule_evaluator.action_for_event_by_user(
- event, context
- )
+ with opentracing.start_active_span("calculate_push_actions"):
+ await self._bulk_push_rule_evaluator.action_for_event_by_user(
+ event, context
+ )
try:
# If we're a worker we need to hit out to the master.
writer_instance = self._events_shard_config.get_instance(event.room_id)
if writer_instance != self._instance_name:
- result = await self.send_event(
- instance_name=writer_instance,
- event_id=event.event_id,
- store=self.store,
- requester=requester,
- event=event,
- context=context,
- ratelimit=ratelimit,
- extra_users=extra_users,
- )
+ try:
+ result = await self.send_event(
+ instance_name=writer_instance,
+ event_id=event.event_id,
+ store=self.store,
+ requester=requester,
+ event=event,
+ context=context,
+ ratelimit=ratelimit,
+ extra_users=extra_users,
+ )
+ except SynapseError as e:
+ if e.code == HTTPStatus.CONFLICT:
+ raise PartialStateConflictError()
+ raise
stream_id = result["stream_id"]
event_id = result["event_id"]
if event_id != event.event_id:
@@ -1436,7 +1469,13 @@ class EventCreationHandler:
if state_entry.state_group in self._external_cache_joined_hosts_updates:
return
- joined_hosts = await self.store.get_joined_hosts(event.room_id, state_entry)
+ state = await state_entry.get_state(
+ self._storage_controllers.state, StateFilter.all()
+ )
+ with opentracing.start_active_span("get_joined_hosts"):
+ joined_hosts = await self.store.get_joined_hosts(
+ event.room_id, state, state_entry
+ )
# Note that the expiry times must be larger than the expiry time in
# _external_cache_joined_hosts_updates.
@@ -1500,6 +1539,10 @@ class EventCreationHandler:
The persisted event. This may be different than the given event if
it was de-duplicated (e.g. because we had already persisted an
event with the same transaction ID.)
+
+ Raises:
+ PartialStateConflictError: if attempting to persist a partial state event in
+ a room that has been un-partial stated.
"""
extra_users = extra_users or []
@@ -1533,6 +1576,16 @@ class EventCreationHandler:
requester, is_admin_redaction=is_admin_redaction
)
+ if event.type == EventTypes.Member and event.membership == Membership.JOIN:
+ (
+ current_membership,
+ _,
+ ) = await self.store.get_local_current_membership_for_user_in_room(
+ event.state_key, event.room_id
+ )
+ if current_membership != Membership.JOIN:
+ self._notifier.notify_user_joined_room(event.event_id, event.room_id)
+
await self._maybe_kick_guest_users(event, context)
if event.type == EventTypes.CanonicalAlias:
@@ -1832,13 +1885,8 @@ class EventCreationHandler:
# For each room we need to find a joined member we can use to send
# the dummy event with.
- latest_event_ids = await self.store.get_prev_events_for_room(room_id)
- members = await self.state.get_current_users_in_room(
- room_id, latest_event_ids=latest_event_ids
- )
+ members = await self.store.get_local_users_in_room(room_id)
for user_id in members:
- if not self.hs.is_mine_id(user_id):
- continue
requester = create_requester(user_id, authenticated_entity=self.server_name)
try:
event, context = await self.create_event(
@@ -1849,7 +1897,6 @@ class EventCreationHandler:
"room_id": room_id,
"sender": user_id,
},
- prev_event_ids=latest_event_ids,
)
event.internal_metadata.proactively_send = False
|