diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 55395457c3..6dcfd86fdf 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -19,6 +19,7 @@ import math
import random
import string
from collections import OrderedDict
+from http import HTTPStatus
from typing import (
TYPE_CHECKING,
Any,
@@ -48,7 +49,6 @@ from synapse.api.constants import (
from synapse.api.errors import (
AuthError,
Codes,
- HttpResponseException,
LimitExceededError,
NotFoundError,
StoreError,
@@ -59,8 +59,6 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.event_auth import validate_event_for_room_version
from synapse.events import EventBase
from synapse.events.utils import copy_and_fixup_power_levels_contents
-from synapse.federation.federation_client import InvalidResponseError
-from synapse.handlers.federation import get_domains_from_state
from synapse.handlers.relations import BundledAggregations
from synapse.module_api import NOT_SPAM
from synapse.rest.admin._base import assert_user_is_admin
@@ -229,9 +227,7 @@ class RoomCreationHandler:
},
)
validate_event_for_room_version(tombstone_event)
- await self._event_auth_handler.check_auth_rules_from_context(
- tombstone_event, tombstone_context
- )
+ await self._event_auth_handler.check_auth_rules_from_context(tombstone_event)
# Upgrade the room
#
@@ -301,8 +297,7 @@ class RoomCreationHandler:
# now send the tombstone
await self.event_creation_handler.handle_new_client_event(
requester=requester,
- event=tombstone_event,
- context=tombstone_context,
+ events_and_context=[(tombstone_event, tombstone_context)],
)
state_filter = StateFilter.from_types(
@@ -562,7 +557,6 @@ class RoomCreationHandler:
invite_list=[],
initial_state=initial_state,
creation_content=creation_content,
- ratelimit=False,
)
# Transfer membership events
@@ -705,8 +699,8 @@ class RoomCreationHandler:
was, requested, `room_alias`. Secondly, the stream_id of the
last persisted event.
Raises:
- SynapseError if the room ID couldn't be stored, or something went
- horribly wrong.
+ SynapseError if the room ID couldn't be stored, 3pid invitation config
+ validation failed, or something went horribly wrong.
ResourceLimitError if server is blocked to some resource being
exceeded
"""
@@ -716,12 +710,12 @@ class RoomCreationHandler:
if (
self._server_notices_mxid is not None
- and requester.user.to_string() == self._server_notices_mxid
+ and user_id == self._server_notices_mxid
):
# allow the server notices mxid to create rooms
is_requester_admin = True
else:
- is_requester_admin = await self.auth.is_server_admin(requester.user)
+ is_requester_admin = await self.auth.is_server_admin(requester)
# Let the third party rules modify the room creation config if needed, or abort
# the room creation entirely with an exception.
@@ -732,6 +726,19 @@ class RoomCreationHandler:
invite_3pid_list = config.get("invite_3pid", [])
invite_list = config.get("invite", [])
+ # validate each entry for correctness
+ for invite_3pid in invite_3pid_list:
+ if not all(
+ key in invite_3pid
+ for key in ("medium", "address", "id_server", "id_access_token")
+ ):
+ raise SynapseError(
+ HTTPStatus.BAD_REQUEST,
+ "all of `medium`, `address`, `id_server` and `id_access_token` "
+ "are required when making a 3pid invite",
+ Codes.MISSING_PARAM,
+ )
+
if not is_requester_admin:
spam_check = await self.spam_checker.user_may_create_room(user_id)
if spam_check != NOT_SPAM:
@@ -743,6 +750,10 @@ class RoomCreationHandler:
)
if ratelimit:
+ # Rate limit once in advance, but don't rate limit the individual
+ # events in the room — room creation isn't atomic and it's very
+ # janky if half the events in the initial state don't make it because
+ # of rate limiting.
await self.request_ratelimiter.ratelimit(requester)
room_version_id = config.get(
@@ -903,7 +914,6 @@ class RoomCreationHandler:
room_alias=room_alias,
power_level_content_override=power_level_content_override,
creator_join_profile=creator_join_profile,
- ratelimit=ratelimit,
)
if "name" in config:
@@ -979,7 +989,7 @@ class RoomCreationHandler:
for invite_3pid in invite_3pid_list:
id_server = invite_3pid["id_server"]
- id_access_token = invite_3pid.get("id_access_token") # optional
+ id_access_token = invite_3pid["id_access_token"]
address = invite_3pid["address"]
medium = invite_3pid["medium"]
# Note that do_3pid_invite can raise a ShadowBanError, but this was
@@ -1027,26 +1037,36 @@ class RoomCreationHandler:
room_alias: Optional[RoomAlias] = None,
power_level_content_override: Optional[JsonDict] = None,
creator_join_profile: Optional[JsonDict] = None,
- ratelimit: bool = True,
) -> Tuple[int, str, int]:
- """Sends the initial events into a new room.
+ """Sends the initial events into a new room. Sends the room creation, membership,
+ and power level events into the room sequentially, then creates and batches up the
+ rest of the events to persist as a batch to the DB.
`power_level_content_override` doesn't apply when initial state has
power level state event content.
+ Rate limiting should already have been applied by this point.
+
Returns:
A tuple containing the stream ID, event ID and depth of the last
event sent to the room.
"""
creator_id = creator.user.to_string()
-
event_keys = {"room_id": room_id, "sender": creator_id, "state_key": ""}
-
depth = 1
- last_sent_event_id: Optional[str] = None
- def create(etype: str, content: JsonDict, **kwargs: Any) -> JsonDict:
+ # the most recently created event
+ prev_event: List[str] = []
+ # a map of event types, state keys -> event_ids. We collect these mappings this as events are
+ # created (but not persisted to the db) to determine state for future created events
+ # (as this info can't be pulled from the db)
+ state_map: MutableStateMap[str] = {}
+ # current_state_group of last event created. Used for computing event context of
+ # events to be batched
+ current_state_group = None
+
+ def create_event_dict(etype: str, content: JsonDict, **kwargs: Any) -> JsonDict:
e = {"type": etype, "content": content}
e.update(event_keys)
@@ -1054,32 +1074,44 @@ class RoomCreationHandler:
return e
- async def send(etype: str, content: JsonDict, **kwargs: Any) -> int:
- nonlocal last_sent_event_id
+ async def create_event(
+ etype: str,
+ content: JsonDict,
+ for_batch: bool,
+ **kwargs: Any,
+ ) -> Tuple[EventBase, synapse.events.snapshot.EventContext]:
+ """
+ Creates an event and associated event context.
+ Args:
+ etype: the type of event to be created
+ content: content of the event
+ for_batch: whether the event is being created for batch persisting. If
+ bool for_batch is true, this will create an event using the prev_event_ids,
+ and will create an event context for the event using the parameters state_map
+ and current_state_group, thus these parameters must be provided in this
+ case if for_batch is True. The subsequently created event and context
+ are suitable for being batched up and bulk persisted to the database
+ with other similarly created events.
+ """
nonlocal depth
+ nonlocal prev_event
- event = create(etype, content, **kwargs)
- logger.debug("Sending %s in new room", etype)
- # Allow these events to be sent even if the user is shadow-banned to
- # allow the room creation to complete.
- (
- sent_event,
- last_stream_id,
- ) = await self.event_creation_handler.create_and_send_nonmember_event(
+ event_dict = create_event_dict(etype, content, **kwargs)
+
+ new_event, new_context = await self.event_creation_handler.create_event(
creator,
- event,
- ratelimit=False,
- ignore_shadow_ban=True,
- # Note: we don't pass state_event_ids here because this triggers
- # an additional query per event to look them up from the events table.
- prev_event_ids=[last_sent_event_id] if last_sent_event_id else [],
+ event_dict,
+ prev_event_ids=prev_event,
depth=depth,
+ state_map=state_map,
+ for_batch=for_batch,
+ current_state_group=current_state_group,
)
-
- last_sent_event_id = sent_event.event_id
depth += 1
+ prev_event = [new_event.event_id]
+ state_map[(new_event.type, new_event.state_key)] = new_event.event_id
- return last_stream_id
+ return new_event, new_context
try:
config = self._presets_dict[preset_config]
@@ -1089,31 +1121,55 @@ class RoomCreationHandler:
)
creation_content.update({"creator": creator_id})
- await send(etype=EventTypes.Create, content=creation_content)
+ creation_event, creation_context = await create_event(
+ EventTypes.Create, creation_content, False
+ )
logger.debug("Sending %s in new room", EventTypes.Member)
- # Room create event must exist at this point
- assert last_sent_event_id is not None
+ ev = await self.event_creation_handler.handle_new_client_event(
+ requester=creator,
+ events_and_context=[(creation_event, creation_context)],
+ ratelimit=False,
+ ignore_shadow_ban=True,
+ )
+ last_sent_event_id = ev.event_id
+
member_event_id, _ = await self.room_member_handler.update_membership(
creator,
creator.user,
room_id,
"join",
- ratelimit=ratelimit,
+ ratelimit=False,
content=creator_join_profile,
new_room=True,
prev_event_ids=[last_sent_event_id],
depth=depth,
)
- last_sent_event_id = member_event_id
+ prev_event = [member_event_id]
+
+ # update the depth and state map here as the membership event has been created
+ # through a different code path
+ depth += 1
+ state_map[(EventTypes.Member, creator.user.to_string())] = member_event_id
+
+ # we need the state group of the membership event as it is the current state group
+ event_to_state = (
+ await self._storage_controllers.state.get_state_group_for_events(
+ [member_event_id]
+ )
+ )
+ current_state_group = event_to_state[member_event_id]
+ events_to_send = []
# We treat the power levels override specially as this needs to be one
# of the first events that get sent into a room.
pl_content = initial_state.pop((EventTypes.PowerLevels, ""), None)
if pl_content is not None:
- last_sent_stream_id = await send(
- etype=EventTypes.PowerLevels, content=pl_content
+ power_event, power_context = await create_event(
+ EventTypes.PowerLevels, pl_content, True
)
+ current_state_group = power_context._state_group
+ events_to_send.append((power_event, power_context))
else:
power_level_content: JsonDict = {
"users": {creator_id: 100},
@@ -1156,48 +1212,73 @@ class RoomCreationHandler:
# apply those.
if power_level_content_override:
power_level_content.update(power_level_content_override)
-
- last_sent_stream_id = await send(
- etype=EventTypes.PowerLevels, content=power_level_content
+ pl_event, pl_context = await create_event(
+ EventTypes.PowerLevels,
+ power_level_content,
+ True,
)
+ current_state_group = pl_context._state_group
+ events_to_send.append((pl_event, pl_context))
if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state:
- last_sent_stream_id = await send(
- etype=EventTypes.CanonicalAlias,
- content={"alias": room_alias.to_string()},
+ room_alias_event, room_alias_context = await create_event(
+ EventTypes.CanonicalAlias, {"alias": room_alias.to_string()}, True
)
+ current_state_group = room_alias_context._state_group
+ events_to_send.append((room_alias_event, room_alias_context))
if (EventTypes.JoinRules, "") not in initial_state:
- last_sent_stream_id = await send(
- etype=EventTypes.JoinRules, content={"join_rule": config["join_rules"]}
+ join_rules_event, join_rules_context = await create_event(
+ EventTypes.JoinRules,
+ {"join_rule": config["join_rules"]},
+ True,
)
+ current_state_group = join_rules_context._state_group
+ events_to_send.append((join_rules_event, join_rules_context))
if (EventTypes.RoomHistoryVisibility, "") not in initial_state:
- last_sent_stream_id = await send(
- etype=EventTypes.RoomHistoryVisibility,
- content={"history_visibility": config["history_visibility"]},
+ visibility_event, visibility_context = await create_event(
+ EventTypes.RoomHistoryVisibility,
+ {"history_visibility": config["history_visibility"]},
+ True,
)
+ current_state_group = visibility_context._state_group
+ events_to_send.append((visibility_event, visibility_context))
if config["guest_can_join"]:
if (EventTypes.GuestAccess, "") not in initial_state:
- last_sent_stream_id = await send(
- etype=EventTypes.GuestAccess,
- content={EventContentFields.GUEST_ACCESS: GuestAccess.CAN_JOIN},
+ guest_access_event, guest_access_context = await create_event(
+ EventTypes.GuestAccess,
+ {EventContentFields.GUEST_ACCESS: GuestAccess.CAN_JOIN},
+ True,
)
+ current_state_group = guest_access_context._state_group
+ events_to_send.append((guest_access_event, guest_access_context))
for (etype, state_key), content in initial_state.items():
- last_sent_stream_id = await send(
- etype=etype, state_key=state_key, content=content
+ event, context = await create_event(
+ etype, content, True, state_key=state_key
)
+ current_state_group = context._state_group
+ events_to_send.append((event, context))
if config["encrypted"]:
- last_sent_stream_id = await send(
- etype=EventTypes.RoomEncryption,
+ encryption_event, encryption_context = await create_event(
+ EventTypes.RoomEncryption,
+ {"algorithm": RoomEncryptionAlgorithms.DEFAULT},
+ True,
state_key="",
- content={"algorithm": RoomEncryptionAlgorithms.DEFAULT},
)
+ events_to_send.append((encryption_event, encryption_context))
- return last_sent_stream_id, last_sent_event_id, depth
+ last_event = await self.event_creation_handler.handle_new_client_event(
+ creator,
+ events_to_send,
+ ignore_shadow_ban=True,
+ ratelimit=False,
+ )
+ assert last_event.internal_metadata.stream_ordering is not None
+ return last_event.internal_metadata.stream_ordering, last_event.event_id, depth
def _generate_room_id(self) -> str:
"""Generates a random room ID.
@@ -1279,13 +1360,16 @@ class RoomContextHandler:
"""
user = requester.user
if use_admin_priviledge:
- await assert_user_is_admin(self.auth, requester.user)
+ await assert_user_is_admin(self.auth, requester)
before_limit = math.floor(limit / 2.0)
after_limit = limit - before_limit
- users = await self.store.get_users_in_room(room_id)
- is_peeking = user.to_string() not in users
+ is_user_in_room = await self.store.check_local_user_in_room(
+ user_id=user.to_string(), room_id=room_id
+ )
+ # The user is peeking if they aren't in the room already
+ is_peeking = not is_user_in_room
async def filter_evts(events: List[EventBase]) -> List[EventBase]:
if use_admin_priviledge:
@@ -1367,7 +1451,7 @@ class RoomContextHandler:
events_before=events_before,
event=event,
events_after=events_after,
- state=await filter_evts(state_events),
+ state=state_events,
aggregations=aggregations,
start=await token.copy_and_replace(
StreamKeyType.ROOM, results.start
@@ -1413,7 +1497,12 @@ class TimestampLookupHandler:
Raises:
SynapseError if unable to find any event locally in the given direction
"""
-
+ logger.debug(
+ "get_event_for_timestamp(room_id=%s, timestamp=%s, direction=%s) Finding closest event...",
+ room_id,
+ timestamp,
+ direction,
+ )
local_event_id = await self.store.get_event_id_for_timestamp(
room_id, timestamp, direction
)
@@ -1459,90 +1548,60 @@ class TimestampLookupHandler:
timestamp,
)
- # Find other homeservers from the given state in the room
- curr_state = await self._storage_controllers.state.get_current_state(
- room_id
+ likely_domains = (
+ await self._storage_controllers.state.get_current_hosts_in_room_ordered(
+ room_id
+ )
)
- curr_domains = get_domains_from_state(curr_state)
- likely_domains = [
- domain for domain, depth in curr_domains if domain != self.server_name
- ]
- # Loop through each homeserver candidate until we get a succesful response
- for domain in likely_domains:
- try:
- remote_response = await self.federation_client.timestamp_to_event(
- domain, room_id, timestamp, direction
- )
- logger.debug(
- "get_event_for_timestamp: response from domain(%s)=%s",
- domain,
- remote_response,
- )
+ remote_response = await self.federation_client.timestamp_to_event(
+ destinations=likely_domains,
+ room_id=room_id,
+ timestamp=timestamp,
+ direction=direction,
+ )
+ if remote_response is not None:
+ logger.debug(
+ "get_event_for_timestamp: remote_response=%s",
+ remote_response,
+ )
- remote_event_id = remote_response.event_id
- remote_origin_server_ts = remote_response.origin_server_ts
-
- # Backfill this event so we can get a pagination token for
- # it with `/context` and paginate `/messages` from this
- # point.
- #
- # TODO: The requested timestamp may lie in a part of the
- # event graph that the remote server *also* didn't have,
- # in which case they will have returned another event
- # which may be nowhere near the requested timestamp. In
- # the future, we may need to reconcile that gap and ask
- # other homeservers, and/or extend `/timestamp_to_event`
- # to return events on *both* sides of the timestamp to
- # help reconcile the gap faster.
- remote_event = (
- await self.federation_event_handler.backfill_event_id(
- domain, room_id, remote_event_id
- )
- )
+ remote_event_id = remote_response.event_id
+ remote_origin_server_ts = remote_response.origin_server_ts
- # XXX: When we see that the remote server is not trustworthy,
- # maybe we should not ask them first in the future.
- if remote_origin_server_ts != remote_event.origin_server_ts:
- logger.info(
- "get_event_for_timestamp: Remote server (%s) claimed that remote_event_id=%s occured at remote_origin_server_ts=%s but that isn't true (actually occured at %s). Their claims are dubious and we should consider not trusting them.",
- domain,
- remote_event_id,
- remote_origin_server_ts,
- remote_event.origin_server_ts,
- )
-
- # Only return the remote event if it's closer than the local event
- if not local_event or (
- abs(remote_event.origin_server_ts - timestamp)
- < abs(local_event.origin_server_ts - timestamp)
- ):
- logger.info(
- "get_event_for_timestamp: returning remote_event_id=%s (%s) since it's closer to timestamp=%s than local_event=%s (%s)",
- remote_event_id,
- remote_event.origin_server_ts,
- timestamp,
- local_event.event_id if local_event else None,
- local_event.origin_server_ts if local_event else None,
- )
- return remote_event_id, remote_origin_server_ts
- except (HttpResponseException, InvalidResponseError) as ex:
- # Let's not put a high priority on some other homeserver
- # failing to respond or giving a random response
- logger.debug(
- "get_event_for_timestamp: Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s",
- domain,
- type(ex).__name__,
- ex,
- ex.args,
+ # Backfill this event so we can get a pagination token for
+ # it with `/context` and paginate `/messages` from this
+ # point.
+ pulled_pdu_info = await self.federation_event_handler.backfill_event_id(
+ likely_domains, room_id, remote_event_id
+ )
+ remote_event = pulled_pdu_info.pdu
+
+ # XXX: When we see that the remote server is not trustworthy,
+ # maybe we should not ask them first in the future.
+ if remote_origin_server_ts != remote_event.origin_server_ts:
+ logger.info(
+ "get_event_for_timestamp: Remote server (%s) claimed that remote_event_id=%s occured at remote_origin_server_ts=%s but that isn't true (actually occured at %s). Their claims are dubious and we should consider not trusting them.",
+ pulled_pdu_info.pull_origin,
+ remote_event_id,
+ remote_origin_server_ts,
+ remote_event.origin_server_ts,
)
- except Exception:
- # But we do want to see some exceptions in our code
- logger.warning(
- "get_event_for_timestamp: Failed to fetch /timestamp_to_event from %s because of exception",
- domain,
- exc_info=True,
+
+ # Only return the remote event if it's closer than the local event
+ if not local_event or (
+ abs(remote_event.origin_server_ts - timestamp)
+ < abs(local_event.origin_server_ts - timestamp)
+ ):
+ logger.info(
+ "get_event_for_timestamp: returning remote_event_id=%s (%s) since it's closer to timestamp=%s than local_event=%s (%s)",
+ remote_event_id,
+ remote_event.origin_server_ts,
+ timestamp,
+ local_event.event_id if local_event else None,
+ local_event.origin_server_ts if local_event else None,
)
+ return remote_event_id, remote_origin_server_ts
# To appease mypy, we have to add both of these conditions to check for
# `None`. We only expect `local_event` to be `None` when
@@ -1565,7 +1624,7 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
self,
user: UserID,
from_key: RoomStreamToken,
- limit: Optional[int],
+ limit: int,
room_ids: Collection[str],
is_guest: bool,
explicit_room_id: Optional[str] = None,
|