diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 57ab05ad25..f81241c2b3 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -49,7 +49,6 @@ from synapse.api.constants import (
from synapse.api.errors import (
AuthError,
Codes,
- HttpResponseException,
LimitExceededError,
NotFoundError,
StoreError,
@@ -60,11 +59,9 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.event_auth import validate_event_for_room_version
from synapse.events import EventBase
from synapse.events.utils import copy_and_fixup_power_levels_contents
-from synapse.federation.federation_client import InvalidResponseError
from synapse.handlers.relations import BundledAggregations
from synapse.module_api import NOT_SPAM
from synapse.rest.admin._base import assert_user_is_admin
-from synapse.storage.state import StateFilter
from synapse.streams import EventSource
from synapse.types import (
JsonDict,
@@ -79,6 +76,7 @@ from synapse.types import (
UserID,
create_requester,
)
+from synapse.types.state import StateFilter
from synapse.util import stringutils
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import parse_and_validate_server_name
@@ -229,9 +227,7 @@ class RoomCreationHandler:
},
)
validate_event_for_room_version(tombstone_event)
- await self._event_auth_handler.check_auth_rules_from_context(
- tombstone_event, tombstone_context
- )
+ await self._event_auth_handler.check_auth_rules_from_context(tombstone_event)
# Upgrade the room
#
@@ -561,7 +557,6 @@ class RoomCreationHandler:
invite_list=[],
initial_state=initial_state,
creation_content=creation_content,
- ratelimit=False,
)
# Transfer membership events
@@ -755,6 +750,10 @@ class RoomCreationHandler:
)
if ratelimit:
+ # Rate limit once in advance, but don't rate limit the individual
+ # events in the room — room creation isn't atomic and it's very
+ # janky if half the events in the initial state don't make it because
+ # of rate limiting.
await self.request_ratelimiter.ratelimit(requester)
room_version_id = config.get(
@@ -915,7 +914,6 @@ class RoomCreationHandler:
room_alias=room_alias,
power_level_content_override=power_level_content_override,
creator_join_profile=creator_join_profile,
- ratelimit=ratelimit,
)
if "name" in config:
@@ -1039,7 +1037,6 @@ class RoomCreationHandler:
room_alias: Optional[RoomAlias] = None,
power_level_content_override: Optional[JsonDict] = None,
creator_join_profile: Optional[JsonDict] = None,
- ratelimit: bool = True,
) -> Tuple[int, str, int]:
"""Sends the initial events into a new room. Sends the room creation, membership,
and power level events into the room sequentially, then creates and batches up the
@@ -1048,6 +1045,8 @@ class RoomCreationHandler:
`power_level_content_override` doesn't apply when initial state has
power level state event content.
+ Rate limiting should already have been applied by this point.
+
Returns:
A tuple containing the stream ID, event ID and depth of the last
event sent to the room.
@@ -1057,9 +1056,6 @@ class RoomCreationHandler:
event_keys = {"room_id": room_id, "sender": creator_id, "state_key": ""}
depth = 1
- # the last event sent/persisted to the db
- last_sent_event_id: Optional[str] = None
-
# the most recently created event
prev_event: List[str] = []
# a map of event types, state keys -> event_ids. We collect these mappings this as events are
@@ -1084,6 +1080,19 @@ class RoomCreationHandler:
for_batch: bool,
**kwargs: Any,
) -> Tuple[EventBase, synapse.events.snapshot.EventContext]:
+ """
+ Creates an event and associated event context.
+ Args:
+ etype: the type of event to be created
+ content: content of the event
+ for_batch: whether the event is being created for batch persisting. If
+ bool for_batch is true, this will create an event using the prev_event_ids,
+ and will create an event context for the event using the parameters state_map
+ and current_state_group, thus these parameters must be provided in this
+ case if for_batch is True. The subsequently created event and context
+ are suitable for being batched up and bulk persisted to the database
+ with other similarly created events.
+ """
nonlocal depth
nonlocal prev_event
@@ -1104,26 +1113,6 @@ class RoomCreationHandler:
return new_event, new_context
- async def send(
- event: EventBase,
- context: synapse.events.snapshot.EventContext,
- creator: Requester,
- ) -> int:
- nonlocal last_sent_event_id
-
- ev = await self.event_creation_handler.handle_new_client_event(
- requester=creator,
- events_and_context=[(event, context)],
- ratelimit=False,
- ignore_shadow_ban=True,
- )
-
- last_sent_event_id = ev.event_id
-
- # we know it was persisted, so must have a stream ordering
- assert ev.internal_metadata.stream_ordering
- return ev.internal_metadata.stream_ordering
-
try:
config = self._presets_dict[preset_config]
except KeyError:
@@ -1137,16 +1126,20 @@ class RoomCreationHandler:
)
logger.debug("Sending %s in new room", EventTypes.Member)
- await send(creation_event, creation_context, creator)
+ ev = await self.event_creation_handler.handle_new_client_event(
+ requester=creator,
+ events_and_context=[(creation_event, creation_context)],
+ ratelimit=False,
+ ignore_shadow_ban=True,
+ )
+ last_sent_event_id = ev.event_id
- # Room create event must exist at this point
- assert last_sent_event_id is not None
member_event_id, _ = await self.room_member_handler.update_membership(
creator,
creator.user,
room_id,
"join",
- ratelimit=ratelimit,
+ ratelimit=False,
content=creator_join_profile,
new_room=True,
prev_event_ids=[last_sent_event_id],
@@ -1159,15 +1152,24 @@ class RoomCreationHandler:
depth += 1
state_map[(EventTypes.Member, creator.user.to_string())] = member_event_id
+ # we need the state group of the membership event as it is the current state group
+ event_to_state = (
+ await self._storage_controllers.state.get_state_group_for_events(
+ [member_event_id]
+ )
+ )
+ current_state_group = event_to_state[member_event_id]
+
+ events_to_send = []
# We treat the power levels override specially as this needs to be one
# of the first events that get sent into a room.
pl_content = initial_state.pop((EventTypes.PowerLevels, ""), None)
if pl_content is not None:
power_event, power_context = await create_event(
- EventTypes.PowerLevels, pl_content, False
+ EventTypes.PowerLevels, pl_content, True
)
current_state_group = power_context._state_group
- await send(power_event, power_context, creator)
+ events_to_send.append((power_event, power_context))
else:
power_level_content: JsonDict = {
"users": {creator_id: 100},
@@ -1213,12 +1215,11 @@ class RoomCreationHandler:
pl_event, pl_context = await create_event(
EventTypes.PowerLevels,
power_level_content,
- False,
+ True,
)
current_state_group = pl_context._state_group
- await send(pl_event, pl_context, creator)
+ events_to_send.append((pl_event, pl_context))
- events_to_send = []
if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state:
room_alias_event, room_alias_context = await create_event(
EventTypes.CanonicalAlias, {"alias": room_alias.to_string()}, True
@@ -1271,7 +1272,10 @@ class RoomCreationHandler:
events_to_send.append((encryption_event, encryption_context))
last_event = await self.event_creation_handler.handle_new_client_event(
- creator, events_to_send, ignore_shadow_ban=True
+ creator,
+ events_to_send,
+ ignore_shadow_ban=True,
+ ratelimit=False,
)
assert last_event.internal_metadata.stream_ordering is not None
return last_event.internal_metadata.stream_ordering, last_event.event_id, depth
@@ -1447,7 +1451,7 @@ class RoomContextHandler:
events_before=events_before,
event=event,
events_after=events_after,
- state=await filter_evts(state_events),
+ state=state_events,
aggregations=aggregations,
start=await token.copy_and_replace(
StreamKeyType.ROOM, results.start
@@ -1493,7 +1497,12 @@ class TimestampLookupHandler:
Raises:
SynapseError if unable to find any event locally in the given direction
"""
-
+ logger.debug(
+ "get_event_for_timestamp(room_id=%s, timestamp=%s, direction=%s) Finding closest event...",
+ room_id,
+ timestamp,
+ direction,
+ )
local_event_id = await self.store.get_event_id_for_timestamp(
room_id, timestamp, direction
)
@@ -1545,85 +1554,54 @@ class TimestampLookupHandler:
)
)
- # Loop through each homeserver candidate until we get a succesful response
- for domain in likely_domains:
- # We don't want to ask our own server for information we don't have
- if domain == self.server_name:
- continue
+ remote_response = await self.federation_client.timestamp_to_event(
+ destinations=likely_domains,
+ room_id=room_id,
+ timestamp=timestamp,
+ direction=direction,
+ )
+ if remote_response is not None:
+ logger.debug(
+ "get_event_for_timestamp: remote_response=%s",
+ remote_response,
+ )
- try:
- remote_response = await self.federation_client.timestamp_to_event(
- domain, room_id, timestamp, direction
- )
- logger.debug(
- "get_event_for_timestamp: response from domain(%s)=%s",
- domain,
- remote_response,
- )
+ remote_event_id = remote_response.event_id
+ remote_origin_server_ts = remote_response.origin_server_ts
- remote_event_id = remote_response.event_id
- remote_origin_server_ts = remote_response.origin_server_ts
-
- # Backfill this event so we can get a pagination token for
- # it with `/context` and paginate `/messages` from this
- # point.
- #
- # TODO: The requested timestamp may lie in a part of the
- # event graph that the remote server *also* didn't have,
- # in which case they will have returned another event
- # which may be nowhere near the requested timestamp. In
- # the future, we may need to reconcile that gap and ask
- # other homeservers, and/or extend `/timestamp_to_event`
- # to return events on *both* sides of the timestamp to
- # help reconcile the gap faster.
- remote_event = (
- await self.federation_event_handler.backfill_event_id(
- domain, room_id, remote_event_id
- )
- )
+ # Backfill this event so we can get a pagination token for
+ # it with `/context` and paginate `/messages` from this
+ # point.
+ pulled_pdu_info = await self.federation_event_handler.backfill_event_id(
+ likely_domains, room_id, remote_event_id
+ )
+ remote_event = pulled_pdu_info.pdu
- # XXX: When we see that the remote server is not trustworthy,
- # maybe we should not ask them first in the future.
- if remote_origin_server_ts != remote_event.origin_server_ts:
- logger.info(
- "get_event_for_timestamp: Remote server (%s) claimed that remote_event_id=%s occured at remote_origin_server_ts=%s but that isn't true (actually occured at %s). Their claims are dubious and we should consider not trusting them.",
- domain,
- remote_event_id,
- remote_origin_server_ts,
- remote_event.origin_server_ts,
- )
-
- # Only return the remote event if it's closer than the local event
- if not local_event or (
- abs(remote_event.origin_server_ts - timestamp)
- < abs(local_event.origin_server_ts - timestamp)
- ):
- logger.info(
- "get_event_for_timestamp: returning remote_event_id=%s (%s) since it's closer to timestamp=%s than local_event=%s (%s)",
- remote_event_id,
- remote_event.origin_server_ts,
- timestamp,
- local_event.event_id if local_event else None,
- local_event.origin_server_ts if local_event else None,
- )
- return remote_event_id, remote_origin_server_ts
- except (HttpResponseException, InvalidResponseError) as ex:
- # Let's not put a high priority on some other homeserver
- # failing to respond or giving a random response
- logger.debug(
- "get_event_for_timestamp: Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s",
- domain,
- type(ex).__name__,
- ex,
- ex.args,
+ # XXX: When we see that the remote server is not trustworthy,
+ # maybe we should not ask them first in the future.
+ if remote_origin_server_ts != remote_event.origin_server_ts:
+ logger.info(
+ "get_event_for_timestamp: Remote server (%s) claimed that remote_event_id=%s occured at remote_origin_server_ts=%s but that isn't true (actually occured at %s). Their claims are dubious and we should consider not trusting them.",
+ pulled_pdu_info.pull_origin,
+ remote_event_id,
+ remote_origin_server_ts,
+ remote_event.origin_server_ts,
)
- except Exception:
- # But we do want to see some exceptions in our code
- logger.warning(
- "get_event_for_timestamp: Failed to fetch /timestamp_to_event from %s because of exception",
- domain,
- exc_info=True,
+
+ # Only return the remote event if it's closer than the local event
+ if not local_event or (
+ abs(remote_event.origin_server_ts - timestamp)
+ < abs(local_event.origin_server_ts - timestamp)
+ ):
+ logger.info(
+ "get_event_for_timestamp: returning remote_event_id=%s (%s) since it's closer to timestamp=%s than local_event=%s (%s)",
+ remote_event_id,
+ remote_event.origin_server_ts,
+ timestamp,
+ local_event.event_id if local_event else None,
+ local_event.origin_server_ts if local_event else None,
)
+ return remote_event_id, remote_origin_server_ts
# To appease mypy, we have to add both of these conditions to check for
# `None`. We only expect `local_event` to be `None` when
@@ -1646,7 +1624,7 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
self,
user: UserID,
from_key: RoomStreamToken,
- limit: Optional[int],
+ limit: int,
room_ids: Collection[str],
is_guest: bool,
explicit_room_id: Optional[str] = None,
|