diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py
index 67e789eef7..797de46dbc 100644
--- a/synapse/handlers/account_data.py
+++ b/synapse/handlers/account_data.py
@@ -343,10 +343,12 @@ class AccountDataEventSource(EventSource[int, JsonDict]):
}
)
- (
- account_data,
- room_account_data,
- ) = await self.store.get_updated_account_data_for_user(user_id, last_stream_id)
+ account_data = await self.store.get_updated_global_account_data_for_user(
+ user_id, last_stream_id
+ )
+ room_account_data = await self.store.get_updated_room_account_data_for_user(
+ user_id, last_stream_id
+ )
for account_data_type, content in account_data.items():
results.append({"type": account_data_type, "content": content})
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 30f2d46c3c..57a6854b1e 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -1593,9 +1593,8 @@ class AuthHandler:
if medium == "email":
address = canonicalise_email(address)
- identity_handler = self.hs.get_identity_handler()
- result = await identity_handler.try_unbind_threepid(
- user_id, {"medium": medium, "address": address, "id_server": id_server}
+ result = await self.hs.get_identity_handler().try_unbind_threepid(
+ user_id, medium, address, id_server
)
await self.store.user_delete_threepid(user_id, medium, address)
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index d74d135c0c..d24f649382 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -106,12 +106,7 @@ class DeactivateAccountHandler:
for threepid in threepids:
try:
result = await self._identity_handler.try_unbind_threepid(
- user_id,
- {
- "medium": threepid["medium"],
- "address": threepid["address"],
- "id_server": id_server,
- },
+ user_id, threepid["medium"], threepid["address"], id_server
)
identity_server_supports_unbinding &= result
except Exception:
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 2ea52257cb..a5798e9483 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -14,7 +14,7 @@
import logging
import string
-from typing import TYPE_CHECKING, Iterable, List, Optional
+from typing import TYPE_CHECKING, Iterable, List, Optional, Sequence
from typing_extensions import Literal
@@ -485,7 +485,8 @@ class DirectoryHandler:
)
)
if canonical_alias:
- room_aliases.append(canonical_alias)
+ # Ensure we do not mutate room_aliases.
+ room_aliases = list(room_aliases) + [canonical_alias]
if not self.config.roomdirectory.is_publishing_room_allowed(
user_id, room_id, room_aliases
@@ -528,7 +529,7 @@ class DirectoryHandler:
async def get_aliases_for_room(
self, requester: Requester, room_id: str
- ) -> List[str]:
+ ) -> Sequence[str]:
"""
Get a list of the aliases that currently point to this room on this server
"""
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index d2188ca08f..43cbece21b 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -159,19 +159,22 @@ class E2eKeysHandler:
# A map of destination -> user ID -> device IDs.
remote_queries_not_in_cache: Dict[str, Dict[str, Iterable[str]]] = {}
if remote_queries:
- query_list: List[Tuple[str, Optional[str]]] = []
+ user_ids = set()
+ user_and_device_ids: List[Tuple[str, str]] = []
for user_id, device_ids in remote_queries.items():
if device_ids:
- query_list.extend(
+ user_and_device_ids.extend(
(user_id, device_id) for device_id in device_ids
)
else:
- query_list.append((user_id, None))
+ user_ids.add(user_id)
(
user_ids_not_in_cache,
remote_results,
- ) = await self.store.get_user_devices_from_cache(query_list)
+ ) = await self.store.get_user_devices_from_cache(
+ user_ids, user_and_device_ids
+ )
# Check that the homeserver still shares a room with all cached users.
# Note that this check may be slightly racy when a remote user leaves a
diff --git a/synapse/handlers/event_auth.py b/synapse/handlers/event_auth.py
index a23a8ce2a1..46dd63c3f0 100644
--- a/synapse/handlers/event_auth.py
+++ b/synapse/handlers/event_auth.py
@@ -202,7 +202,7 @@ class EventAuthHandler:
state_ids: StateMap[str],
room_version: RoomVersion,
user_id: str,
- prev_member_event: Optional[EventBase],
+ prev_membership: Optional[str],
) -> None:
"""
Check whether a user can join a room without an invite due to restricted join rules.
@@ -214,15 +214,14 @@ class EventAuthHandler:
state_ids: The state of the room as it currently is.
room_version: The room version of the room being joined.
user_id: The user joining the room.
- prev_member_event: The current membership event for this user.
+ prev_membership: The current membership state for this user. `None` if the
+ user has never joined the room (equivalent to "leave").
Raises:
AuthError if the user cannot join the room.
"""
# If the member is invited or currently joined, then nothing to do.
- if prev_member_event and (
- prev_member_event.membership in (Membership.JOIN, Membership.INVITE)
- ):
+ if prev_membership in (Membership.JOIN, Membership.INVITE):
return
# This is not a room with a restricted join rule, so we don't need to do the
@@ -255,13 +254,14 @@ class EventAuthHandler:
)
async def has_restricted_join_rules(
- self, state_ids: StateMap[str], room_version: RoomVersion
+ self, partial_state_ids: StateMap[str], room_version: RoomVersion
) -> bool:
"""
Return if the room has the proper join rules set for access via rooms.
Args:
- state_ids: The state of the room as it currently is.
+ state_ids: The state of the room as it currently is. May be full or partial
+ state.
room_version: The room version of the room to query.
Returns:
@@ -272,7 +272,7 @@ class EventAuthHandler:
return False
# If there's no join rule, then it defaults to invite (so this doesn't apply).
- join_rules_event_id = state_ids.get((EventTypes.JoinRules, ""), None)
+ join_rules_event_id = partial_state_ids.get((EventTypes.JoinRules, ""), None)
if not join_rules_event_id:
return False
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 7f64130e0a..08727e4857 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -49,6 +49,7 @@ from synapse.api.errors import (
FederationPullAttemptBackoffError,
HttpResponseException,
NotFoundError,
+ PartialStateConflictError,
RequestSendFailed,
SynapseError,
)
@@ -56,7 +57,7 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.crypto.event_signing import compute_event_signature
from synapse.event_auth import validate_event_for_room_version
from synapse.events import EventBase
-from synapse.events.snapshot import EventContext
+from synapse.events.snapshot import EventContext, UnpersistedEventContextBase
from synapse.events.validator import EventValidator
from synapse.federation.federation_client import InvalidResponseError
from synapse.http.servlet import assert_params_in_dict
@@ -68,7 +69,6 @@ from synapse.replication.http.federation import (
ReplicationCleanRoomRestServlet,
ReplicationStoreRoomOnOutlierMembershipRestServlet,
)
-from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import JsonDict, StrCollection, get_domain_from_id
from synapse.types.state import StateFilter
@@ -990,7 +990,10 @@ class FederationHandler:
)
try:
- event, context = await self.event_creation_handler.create_new_client_event(
+ (
+ event,
+ unpersisted_context,
+ ) = await self.event_creation_handler.create_new_client_event(
builder=builder
)
except SynapseError as e:
@@ -998,7 +1001,9 @@ class FederationHandler:
raise
# Ensure the user can even join the room.
- await self._federation_event_handler.check_join_restrictions(context, event)
+ await self._federation_event_handler.check_join_restrictions(
+ unpersisted_context, event
+ )
# The remote hasn't signed it yet, obviously. We'll do the full checks
# when we get the event back in `on_send_join_request`
@@ -1178,7 +1183,7 @@ class FederationHandler:
},
)
- event, context = await self.event_creation_handler.create_new_client_event(
+ event, _ = await self.event_creation_handler.create_new_client_event(
builder=builder
)
@@ -1228,12 +1233,13 @@ class FederationHandler:
},
)
- event, context = await self.event_creation_handler.create_new_client_event(
- builder=builder
- )
+ (
+ event,
+ unpersisted_context,
+ ) = await self.event_creation_handler.create_new_client_event(builder=builder)
event_allowed, _ = await self.third_party_event_rules.check_event_allowed(
- event, context
+ event, unpersisted_context
)
if not event_allowed:
logger.warning("Creation of knock %s forbidden by third-party rules", event)
@@ -1406,15 +1412,20 @@ class FederationHandler:
try:
(
event,
- context,
+ unpersisted_context,
) = await self.event_creation_handler.create_new_client_event(
builder=builder
)
- event, context = await self.add_display_name_to_third_party_invite(
- room_version_obj, event_dict, event, context
+ (
+ event,
+ unpersisted_context,
+ ) = await self.add_display_name_to_third_party_invite(
+ room_version_obj, event_dict, event, unpersisted_context
)
+ context = await unpersisted_context.persist(event)
+
EventValidator().validate_new(event, self.config)
# We need to tell the transaction queue to send this out, even
@@ -1483,14 +1494,19 @@ class FederationHandler:
try:
(
event,
- context,
+ unpersisted_context,
) = await self.event_creation_handler.create_new_client_event(
builder=builder
)
- event, context = await self.add_display_name_to_third_party_invite(
- room_version_obj, event_dict, event, context
+ (
+ event,
+ unpersisted_context,
+ ) = await self.add_display_name_to_third_party_invite(
+ room_version_obj, event_dict, event, unpersisted_context
)
+ context = await unpersisted_context.persist(event)
+
try:
validate_event_for_room_version(event)
await self._event_auth_handler.check_auth_rules_from_context(event)
@@ -1522,8 +1538,8 @@ class FederationHandler:
room_version_obj: RoomVersion,
event_dict: JsonDict,
event: EventBase,
- context: EventContext,
- ) -> Tuple[EventBase, EventContext]:
+ context: UnpersistedEventContextBase,
+ ) -> Tuple[EventBase, UnpersistedEventContextBase]:
key = (
EventTypes.ThirdPartyInvite,
event.content["third_party_invite"]["signed"]["token"],
@@ -1557,11 +1573,14 @@ class FederationHandler:
room_version_obj, event_dict
)
EventValidator().validate_builder(builder)
- event, context = await self.event_creation_handler.create_new_client_event(
- builder=builder
- )
+
+ (
+ event,
+ unpersisted_context,
+ ) = await self.event_creation_handler.create_new_client_event(builder=builder)
+
EventValidator().validate_new(event, self.config)
- return event, context
+ return event, unpersisted_context
async def _check_signature(self, event: EventBase, context: EventContext) -> None:
"""
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index e037acbca2..b7136f8d1c 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -47,6 +47,7 @@ from synapse.api.errors import (
FederationError,
FederationPullAttemptBackoffError,
HttpResponseException,
+ PartialStateConflictError,
RequestSendFailed,
SynapseError,
)
@@ -58,7 +59,7 @@ from synapse.event_auth import (
validate_event_for_room_version,
)
from synapse.events import EventBase
-from synapse.events.snapshot import EventContext
+from synapse.events.snapshot import EventContext, UnpersistedEventContextBase
from synapse.federation.federation_client import InvalidResponseError, PulledPduInfo
from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import (
@@ -74,7 +75,6 @@ from synapse.replication.http.federation import (
ReplicationFederationSendEventsRestServlet,
)
from synapse.state import StateResolutionStore
-from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
PersistedEventPosition,
@@ -426,7 +426,9 @@ class FederationEventHandler:
return event, context
async def check_join_restrictions(
- self, context: EventContext, event: EventBase
+ self,
+ context: UnpersistedEventContextBase,
+ event: EventBase,
) -> None:
"""Check that restrictions in restricted join rules are matched
@@ -439,16 +441,17 @@ class FederationEventHandler:
# Check if the user is already in the room or invited to the room.
user_id = event.state_key
prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
- prev_member_event = None
+ prev_membership = None
if prev_member_event_id:
prev_member_event = await self._store.get_event(prev_member_event_id)
+ prev_membership = prev_member_event.membership
# Check if the member should be allowed access via membership in a space.
await self._event_auth_handler.check_restricted_join_rules(
prev_state_ids,
event.room_version,
user_id,
- prev_member_event,
+ prev_membership,
)
@trace
@@ -524,11 +527,57 @@ class FederationEventHandler:
"Peristing join-via-remote %s (partial_state: %s)", event, partial_state
)
with nested_logging_context(suffix=event.event_id):
+ if partial_state:
+ # When handling a second partial state join into a partial state room,
+ # the returned state will exclude the membership from the first join. To
+ # preserve prior memberships, we try to compute the partial state before
+ # the event ourselves if we know about any of the prev events.
+ #
+ # When we don't know about any of the prev events, it's fine to just use
+ # the returned state, since the new join will create a new forward
+ # extremity, and leave the forward extremity containing our prior
+ # memberships alone.
+ prev_event_ids = set(event.prev_event_ids())
+ seen_event_ids = await self._store.have_events_in_timeline(
+ prev_event_ids
+ )
+ missing_event_ids = prev_event_ids - seen_event_ids
+
+ state_maps_to_resolve: List[StateMap[str]] = []
+
+ # Fetch the state after the prev events that we know about.
+ state_maps_to_resolve.extend(
+ (
+ await self._state_storage_controller.get_state_groups_ids(
+ room_id, seen_event_ids, await_full_state=False
+ )
+ ).values()
+ )
+
+ # When there are prev events we do not have the state for, we state
+ # resolve with the state returned by the remote homeserver.
+ if missing_event_ids or len(state_maps_to_resolve) == 0:
+ state_maps_to_resolve.append(
+ {(e.type, e.state_key): e.event_id for e in state}
+ )
+
+ state_ids_before_event = (
+ await self._state_resolution_handler.resolve_events_with_store(
+ event.room_id,
+ room_version.identifier,
+ state_maps_to_resolve,
+ event_map=None,
+ state_res_store=StateResolutionStore(self._store),
+ )
+ )
+ else:
+ state_ids_before_event = {
+ (e.type, e.state_key): e.event_id for e in state
+ }
+
context = await self._state_handler.compute_event_context(
event,
- state_ids_before_event={
- (e.type, e.state_key): e.event_id for e in state
- },
+ state_ids_before_event=state_ids_before_event,
partial_state=partial_state,
)
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 848e46eb9b..bf0f7acf80 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -219,28 +219,31 @@ class IdentityHandler:
data = json_decoder.decode(e.msg) # XXX WAT?
return data
- async def try_unbind_threepid(self, mxid: str, threepid: dict) -> bool:
- """Attempt to remove a 3PID from an identity server, or if one is not provided, all
- identity servers we're aware the binding is present on
+ async def try_unbind_threepid(
+ self, mxid: str, medium: str, address: str, id_server: Optional[str]
+ ) -> bool:
+ """Attempt to remove a 3PID from one or more identity servers.
Args:
mxid: Matrix user ID of binding to be removed
- threepid: Dict with medium & address of binding to be
- removed, and an optional id_server.
+ medium: The medium of the third-party ID.
+ address: The address of the third-party ID.
+ id_server: An identity server to attempt to unbind from. If None,
+ attempt to remove the association from all identity servers
+ known to potentially have it.
Raises:
- SynapseError: If we failed to contact the identity server
+ SynapseError: If we failed to contact one or more identity servers.
Returns:
- True on success, otherwise False if the identity
- server doesn't support unbinding (or no identity server found to
- contact).
+ True on success, otherwise False if the identity server doesn't
+ support unbinding (or no identity server to contact was found).
"""
- if threepid.get("id_server"):
- id_servers = [threepid["id_server"]]
+ if id_server:
+ id_servers = [id_server]
else:
id_servers = await self.store.get_id_servers_user_bound(
- user_id=mxid, medium=threepid["medium"], address=threepid["address"]
+ mxid, medium, address
)
# We don't know where to unbind, so we don't have a choice but to return
@@ -249,20 +252,21 @@ class IdentityHandler:
changed = True
for id_server in id_servers:
- changed &= await self.try_unbind_threepid_with_id_server(
- mxid, threepid, id_server
+ changed &= await self._try_unbind_threepid_with_id_server(
+ mxid, medium, address, id_server
)
return changed
- async def try_unbind_threepid_with_id_server(
- self, mxid: str, threepid: dict, id_server: str
+ async def _try_unbind_threepid_with_id_server(
+ self, mxid: str, medium: str, address: str, id_server: str
) -> bool:
"""Removes a binding from an identity server
Args:
mxid: Matrix user ID of binding to be removed
- threepid: Dict with medium & address of binding to be removed
+ medium: The medium of the third-party ID
+ address: The address of the third-party ID
id_server: Identity server to unbind from
Raises:
@@ -286,7 +290,7 @@ class IdentityHandler:
content = {
"mxid": mxid,
- "threepid": {"medium": threepid["medium"], "address": threepid["address"]},
+ "threepid": {"medium": medium, "address": address},
}
# we abuse the federation http client to sign the request, but we have to send it
@@ -319,12 +323,7 @@ class IdentityHandler:
except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
- await self.store.remove_user_bound_threepid(
- user_id=mxid,
- medium=threepid["medium"],
- address=threepid["address"],
- id_server=id_server,
- )
+ await self.store.remove_user_bound_threepid(mxid, medium, address, id_server)
return changed
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 191529bd8e..1a29abde98 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -154,9 +154,8 @@ class InitialSyncHandler:
tags_by_room = await self.store.get_tags_for_user(user_id)
- account_data, account_data_by_room = await self.store.get_account_data_for_user(
- user_id
- )
+ account_data = await self.store.get_global_account_data_for_user(user_id)
+ account_data_by_room = await self.store.get_room_account_data_for_user(user_id)
public_room_ids = await self.store.get_public_room_ids()
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e688e00575..8f5b658d9d 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -38,6 +38,7 @@ from synapse.api.errors import (
Codes,
ConsentNotGivenError,
NotFoundError,
+ PartialStateConflictError,
ShadowBanError,
SynapseError,
UnstableSpecAuthError,
@@ -48,7 +49,7 @@ from synapse.api.urls import ConsentURIBuilder
from synapse.event_auth import validate_event_for_room_version
from synapse.events import EventBase, relation_from_event
from synapse.events.builder import EventBuilder
-from synapse.events.snapshot import EventContext
+from synapse.events.snapshot import EventContext, UnpersistedEventContextBase
from synapse.events.utils import maybe_upsert_event_field
from synapse.events.validator import EventValidator
from synapse.handlers.directory import DirectoryHandler
@@ -57,7 +58,6 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.replication.http.send_events import ReplicationSendEventsRestServlet
-from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
MutableStateMap,
@@ -499,9 +499,9 @@ class EventCreationHandler:
self.request_ratelimiter = hs.get_request_ratelimiter()
- # We arbitrarily limit concurrent event creation for a room to 5.
- # This is to stop us from diverging history *too* much.
- self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
+ # We limit concurrent event creation for a room to 1. This prevents state resolution
+ # from occurring when sending bursts of events to a local room
+ self.limiter = Linearizer(max_count=1, name="room_event_creation_limit")
self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
@@ -708,7 +708,7 @@ class EventCreationHandler:
builder.internal_metadata.historical = historical
- event, context = await self.create_new_client_event(
+ event, unpersisted_context = await self.create_new_client_event(
builder=builder,
requester=requester,
allow_no_prev_events=allow_no_prev_events,
@@ -721,6 +721,8 @@ class EventCreationHandler:
current_state_group=current_state_group,
)
+ context = await unpersisted_context.persist(event)
+
# In an ideal world we wouldn't need the second part of this condition. However,
# this behaviour isn't spec'd yet, meaning we should be able to deactivate this
# behaviour. Another reason is that this code is also evaluated each time a new
@@ -1083,13 +1085,14 @@ class EventCreationHandler:
state_map: Optional[StateMap[str]] = None,
for_batch: bool = False,
current_state_group: Optional[int] = None,
- ) -> Tuple[EventBase, EventContext]:
+ ) -> Tuple[EventBase, UnpersistedEventContextBase]:
"""Create a new event for a local client. If bool for_batch is true, will
create an event using the prev_event_ids, and will create an event context for
the event using the parameters state_map and current_state_group, thus these parameters
must be provided in this case if for_batch is True. The subsequently created event
and context are suitable for being batched up and bulk persisted to the database
- with other similarly created events.
+ with other similarly created events. Note that this returns an UnpersistedEventContext,
+ which must be converted to an EventContext before it can be sent to the DB.
Args:
builder:
@@ -1131,7 +1134,7 @@ class EventCreationHandler:
batch persisting
Returns:
- Tuple of created event, context
+ Tuple of created event, UnpersistedEventContext
"""
# Strip down the state_event_ids to only what we need to auth the event.
# For example, we don't need extra m.room.member that don't match event.sender
@@ -1192,9 +1195,16 @@ class EventCreationHandler:
event = await builder.build(
prev_event_ids=prev_event_ids, auth_event_ids=auth_ids, depth=depth
)
- context = await self.state.compute_event_context_for_batched(
- event, state_map, current_state_group
+
+ context: UnpersistedEventContextBase = (
+ await self.state.calculate_context_info(
+ event,
+ state_ids_before_event=state_map,
+ partial_state=False,
+ state_group_before_event=current_state_group,
+ )
)
+
else:
event = await builder.build(
prev_event_ids=prev_event_ids,
@@ -1244,16 +1254,17 @@ class EventCreationHandler:
state_map_for_event[(data.event_type, data.state_key)] = state_id
- context = await self.state.compute_event_context(
+ # TODO(faster_joins): check how MSC2716 works and whether we can have
+ # partial state here
+ # https://github.com/matrix-org/synapse/issues/13003
+ context = await self.state.calculate_context_info(
event,
state_ids_before_event=state_map_for_event,
- # TODO(faster_joins): check how MSC2716 works and whether we can have
- # partial state here
- # https://github.com/matrix-org/synapse/issues/13003
partial_state=False,
)
+
else:
- context = await self.state.compute_event_context(event)
+ context = await self.state.calculate_context_info(event)
if requester:
context.app_service = requester.app_service
@@ -2082,9 +2093,9 @@ class EventCreationHandler:
async def _rebuild_event_after_third_party_rules(
self, third_party_result: dict, original_event: EventBase
- ) -> Tuple[EventBase, EventContext]:
+ ) -> Tuple[EventBase, UnpersistedEventContextBase]:
# the third_party_event_rules want to replace the event.
- # we do some basic checks, and then return the replacement event and context.
+ # we do some basic checks, and then return the replacement event.
# Construct a new EventBuilder and validate it, which helps with the
# rest of these checks.
@@ -2138,5 +2149,6 @@ class EventCreationHandler:
# we rebuild the event context, to be on the safe side. If nothing else,
# delta_ids might need an update.
- context = await self.state.compute_event_context(event)
+ context = await self.state.calculate_context_info(event)
+
return event, context
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 04c61ae3dd..2bacdebfb5 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
+from typing import TYPE_CHECKING, Iterable, List, Optional, Sequence, Tuple
from synapse.api.constants import EduTypes, ReceiptTypes
from synapse.appservice import ApplicationService
@@ -189,7 +189,7 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
@staticmethod
def filter_out_private_receipts(
- rooms: List[JsonDict], user_id: str
+ rooms: Sequence[JsonDict], user_id: str
) -> List[JsonDict]:
"""
Filters a list of serialized receipts (as returned by /sync and /initialSync)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 7ba7c4ff07..837dabb3b7 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -43,6 +43,7 @@ from synapse.api.errors import (
Codes,
LimitExceededError,
NotFoundError,
+ PartialStateConflictError,
StoreError,
SynapseError,
)
@@ -54,7 +55,6 @@ from synapse.events.utils import copy_and_fixup_power_levels_contents
from synapse.handlers.relations import BundledAggregations
from synapse.module_api import NOT_SPAM
from synapse.rest.admin._base import assert_user_is_admin
-from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.streams import EventSource
from synapse.types import (
JsonDict,
@@ -1076,7 +1076,7 @@ class RoomCreationHandler:
state_map: MutableStateMap[str] = {}
# current_state_group of last event created. Used for computing event context of
# events to be batched
- current_state_group = None
+ current_state_group: Optional[int] = None
def create_event_dict(etype: str, content: JsonDict, **kwargs: Any) -> JsonDict:
e = {"type": etype, "content": content}
@@ -1928,6 +1928,6 @@ class RoomShutdownHandler:
return {
"kicked_users": kicked_users,
"failed_to_kick_users": failed_to_kick_users,
- "local_aliases": aliases_for_room,
+ "local_aliases": list(aliases_for_room),
"new_room_id": new_room_id,
}
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index d236cc09b5..a965c7ec76 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -26,7 +26,13 @@ from synapse.api.constants import (
GuestAccess,
Membership,
)
-from synapse.api.errors import AuthError, Codes, ShadowBanError, SynapseError
+from synapse.api.errors import (
+ AuthError,
+ Codes,
+ PartialStateConflictError,
+ ShadowBanError,
+ SynapseError,
+)
from synapse.api.ratelimiting import Ratelimiter
from synapse.event_auth import get_named_level, get_power_level_event
from synapse.events import EventBase
@@ -34,7 +40,6 @@ from synapse.events.snapshot import EventContext
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
from synapse.logging import opentracing
from synapse.module_api import NOT_SPAM
-from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.types import (
JsonDict,
Requester,
@@ -56,6 +61,13 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
+class NoKnownServersError(SynapseError):
+ """No server already resident to the room was provided to the join/knock operation."""
+
+ def __init__(self, msg: str = "No known servers"):
+ super().__init__(404, msg)
+
+
class RoomMemberHandler(metaclass=abc.ABCMeta):
# TODO(paul): This handler currently contains a messy conflation of
# low-level API that works on UserID objects and so on, and REST-level
@@ -185,6 +197,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
room_id: Room that we are trying to join
user: User who is trying to join
content: A dict that should be used as the content of the join event.
+
+ Raises:
+ NoKnownServersError: if remote_room_hosts does not contain a server joined to
+ the room.
"""
raise NotImplementedError()
@@ -484,7 +500,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
user_id: The user's ID.
"""
# Retrieve user account data for predecessor room
- user_account_data, _ = await self.store.get_account_data_for_user(user_id)
+ user_account_data = await self.store.get_global_account_data_for_user(user_id)
# Copy direct message state if applicable
direct_rooms = user_account_data.get(AccountDataTypes.DIRECT, {})
@@ -823,14 +839,19 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
latest_event_ids = await self.store.get_prev_events_for_room(room_id)
- state_before_join = await self.state_handler.compute_state_after_events(
- room_id, latest_event_ids
+ is_partial_state_room = await self.store.is_partial_state_room(room_id)
+ partial_state_before_join = await self.state_handler.compute_state_after_events(
+ room_id, latest_event_ids, await_full_state=False
)
+ # `is_partial_state_room` also indicates whether `partial_state_before_join` is
+ # partial.
# TODO: Refactor into dictionary of explicitly allowed transitions
# between old and new state, with specific error messages for some
# transitions and generic otherwise
- old_state_id = state_before_join.get((EventTypes.Member, target.to_string()))
+ old_state_id = partial_state_before_join.get(
+ (EventTypes.Member, target.to_string())
+ )
if old_state_id:
old_state = await self.store.get_event(old_state_id, allow_none=True)
old_membership = old_state.content.get("membership") if old_state else None
@@ -881,11 +902,11 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
if action == "kick":
raise AuthError(403, "The target user is not in the room")
- is_host_in_room = await self._is_host_in_room(state_before_join)
+ is_host_in_room = await self._is_host_in_room(partial_state_before_join)
if effective_membership_state == Membership.JOIN:
if requester.is_guest:
- guest_can_join = await self._can_guest_join(state_before_join)
+ guest_can_join = await self._can_guest_join(partial_state_before_join)
if not guest_can_join:
# This should be an auth check, but guests are a local concept,
# so don't really fit into the general auth process.
@@ -927,8 +948,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
room_id,
remote_room_hosts,
content,
+ is_partial_state_room,
is_host_in_room,
- state_before_join,
+ partial_state_before_join,
)
if remote_join:
if ratelimit:
@@ -1073,8 +1095,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
room_id: str,
remote_room_hosts: List[str],
content: JsonDict,
+ is_partial_state_room: bool,
is_host_in_room: bool,
- state_before_join: StateMap[str],
+ partial_state_before_join: StateMap[str],
) -> Tuple[bool, List[str]]:
"""
Check whether the server should do a remote join (as opposed to a local
@@ -1093,9 +1116,12 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
remote_room_hosts: A list of remote room hosts.
content: The content to use as the event body of the join. This may
be modified.
- is_host_in_room: True if the host is in the room.
- state_before_join: The state before the join event (i.e. the resolution of
- the states after its parent events).
+ is_partial_state_room: `True` if the server currently doesn't hold the full
+ state of the room.
+ is_host_in_room: `True` if the host is in the room.
+ partial_state_before_join: The state before the join event (i.e. the
+ resolution of the states after its parent events). May be full or
+ partial state, depending on `is_partial_state_room`.
Returns:
A tuple of:
@@ -1109,6 +1135,23 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
if not is_host_in_room:
return True, remote_room_hosts
+ prev_member_event_id = partial_state_before_join.get(
+ (EventTypes.Member, user_id), None
+ )
+ previous_membership = None
+ if prev_member_event_id:
+ prev_member_event = await self.store.get_event(prev_member_event_id)
+ previous_membership = prev_member_event.membership
+
+ # If we are not fully joined yet, and the target is not already in the room,
+ # let's do a remote join so another server with the full state can validate
+ # that the user has not been banned for example.
+ # We could just accept the join and wait for state res to resolve that later on
+ # but we would then leak room history to this person until then, which is pretty
+ # bad.
+ if is_partial_state_room and previous_membership != Membership.JOIN:
+ return True, remote_room_hosts
+
# If the host is in the room, but not one of the authorised hosts
# for restricted join rules, a remote join must be used.
room_version = await self.store.get_room_version(room_id)
@@ -1116,21 +1159,19 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# If restricted join rules are not being used, a local join can always
# be used.
if not await self.event_auth_handler.has_restricted_join_rules(
- state_before_join, room_version
+ partial_state_before_join, room_version
):
return False, []
# If the user is invited to the room or already joined, the join
# event can always be issued locally.
- prev_member_event_id = state_before_join.get((EventTypes.Member, user_id), None)
- prev_member_event = None
- if prev_member_event_id:
- prev_member_event = await self.store.get_event(prev_member_event_id)
- if prev_member_event.membership in (
- Membership.JOIN,
- Membership.INVITE,
- ):
- return False, []
+ if previous_membership in (Membership.JOIN, Membership.INVITE):
+ return False, []
+
+ # All the partial state cases are covered above. We have been given the full
+ # state of the room.
+ assert not is_partial_state_room
+ state_before_join = partial_state_before_join
# If the local host has a user who can issue invites, then a local
# join can be done.
@@ -1154,7 +1195,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# Ensure the member should be allowed access via membership in a room.
await self.event_auth_handler.check_restricted_join_rules(
- state_before_join, room_version, user_id, prev_member_event
+ state_before_join, room_version, user_id, previous_membership
)
# If this is going to be a local join, additional information must
@@ -1304,11 +1345,17 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
if prev_member_event.membership == Membership.JOIN:
await self._user_left_room(target_user, room_id)
- async def _can_guest_join(self, current_state_ids: StateMap[str]) -> bool:
+ async def _can_guest_join(self, partial_current_state_ids: StateMap[str]) -> bool:
"""
Returns whether a guest can join a room based on its current state.
+
+ Args:
+ partial_current_state_ids: The current state of the room. May be full or
+ partial state.
"""
- guest_access_id = current_state_ids.get((EventTypes.GuestAccess, ""), None)
+ guest_access_id = partial_current_state_ids.get(
+ (EventTypes.GuestAccess, ""), None
+ )
if not guest_access_id:
return False
@@ -1634,19 +1681,25 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
)
return event, stream_id
- async def _is_host_in_room(self, current_state_ids: StateMap[str]) -> bool:
+ async def _is_host_in_room(self, partial_current_state_ids: StateMap[str]) -> bool:
+ """Returns whether the homeserver is in the room based on its current state.
+
+ Args:
+ partial_current_state_ids: The current state of the room. May be full or
+ partial state.
+ """
# Have we just created the room, and is this about to be the very
# first member event?
- create_event_id = current_state_ids.get(("m.room.create", ""))
- if len(current_state_ids) == 1 and create_event_id:
+ create_event_id = partial_current_state_ids.get(("m.room.create", ""))
+ if len(partial_current_state_ids) == 1 and create_event_id:
# We can only get here if we're in the process of creating the room
return True
- for etype, state_key in current_state_ids:
+ for etype, state_key in partial_current_state_ids:
if etype != EventTypes.Member or not self.hs.is_mine_id(state_key):
continue
- event_id = current_state_ids[(etype, state_key)]
+ event_id = partial_current_state_ids[(etype, state_key)]
event = await self.store.get_event(event_id, allow_none=True)
if not event:
continue
@@ -1715,8 +1768,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
]
if len(remote_room_hosts) == 0:
- raise SynapseError(
- 404,
+ raise NoKnownServersError(
"Can't join remote room because no servers "
"that are in the room have been provided.",
)
@@ -1947,7 +1999,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
]
if len(remote_room_hosts) == 0:
- raise SynapseError(404, "No known servers")
+ raise NoKnownServersError()
return await self.federation_handler.do_knock(
remote_room_hosts, room_id, user.to_string(), content=content
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 221552a2a6..ba261702d4 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -15,8 +15,7 @@
import logging
from typing import TYPE_CHECKING, List, Optional, Tuple
-from synapse.api.errors import SynapseError
-from synapse.handlers.room_member import RoomMemberHandler
+from synapse.handlers.room_member import NoKnownServersError, RoomMemberHandler
from synapse.replication.http.membership import (
ReplicationRemoteJoinRestServlet as ReplRemoteJoin,
ReplicationRemoteKnockRestServlet as ReplRemoteKnock,
@@ -52,7 +51,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
) -> Tuple[str, int]:
"""Implements RoomMemberHandler._remote_join"""
if len(remote_room_hosts) == 0:
- raise SynapseError(404, "No known servers")
+ raise NoKnownServersError()
ret = await self._remote_join_client(
requester=requester,
diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py
index 4472019fbc..807245160d 100644
--- a/synapse/handlers/room_summary.py
+++ b/synapse/handlers/room_summary.py
@@ -521,8 +521,8 @@ class RoomSummaryHandler:
It should return true if:
- * The requester is joined or can join the room (per MSC3173).
- * The origin server has any user that is joined or can join the room.
+ * The requesting user is joined or can join the room (per MSC3173); or
+ * The origin server has any user that is joined or can join the room; or
* The history visibility is set to world readable.
Args:
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 3566537894..4e4595312c 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -269,6 +269,8 @@ class SyncHandler:
self._state_storage_controller = self._storage_controllers.state
self._device_handler = hs.get_device_handler()
+ self.should_calculate_push_rules = hs.config.push.enable_push
+
# TODO: flush cache entries on subsequent sync request.
# Once we get the next /sync request (ie, one with the same access token
# that sets 'since' to 'next_batch'), we know that device won't need a
@@ -1288,6 +1290,12 @@ class SyncHandler:
async def unread_notifs_for_room_id(
self, room_id: str, sync_config: SyncConfig
) -> RoomNotifCounts:
+ if not self.should_calculate_push_rules:
+ # If push rules have been universally disabled then we know we won't
+ # have any unread counts in the DB, so we may as well skip asking
+ # the DB.
+ return RoomNotifCounts.empty()
+
with Measure(self.clock, "unread_notifs_for_room_id"):
return await self.store.get_unread_event_push_actions_by_room_for_user(
@@ -1391,6 +1399,11 @@ class SyncHandler:
for room_id, is_partial_state in results.items()
if is_partial_state
)
+ membership_change_events = [
+ event
+ for event in membership_change_events
+ if not results.get(event.room_id, False)
+ ]
# Incremental eager syncs should additionally include rooms that
# - we are joined to
@@ -1444,9 +1457,9 @@ class SyncHandler:
logger.debug("Fetching account data")
- account_data_by_room = await self._generate_sync_entry_for_account_data(
- sync_result_builder
- )
+ # Global account data is included if it is not filtered out.
+ if not sync_config.filter_collection.blocks_all_global_account_data():
+ await self._generate_sync_entry_for_account_data(sync_result_builder)
# Presence data is included if the server has it enabled and not filtered out.
include_presence_data = bool(
@@ -1472,9 +1485,7 @@ class SyncHandler:
(
newly_joined_rooms,
newly_left_rooms,
- ) = await self._generate_sync_entry_for_rooms(
- sync_result_builder, account_data_by_room
- )
+ ) = await self._generate_sync_entry_for_rooms(sync_result_builder)
# Work out which users have joined or left rooms we're in. We use this
# to build the presence and device_list parts of the sync response in
@@ -1521,7 +1532,7 @@ class SyncHandler:
one_time_keys_count = await self.store.count_e2e_one_time_keys(
user_id, device_id
)
- unused_fallback_key_types = (
+ unused_fallback_key_types = list(
await self.store.get_e2e_unused_fallback_key_types(user_id, device_id)
)
@@ -1717,35 +1728,29 @@ class SyncHandler:
async def _generate_sync_entry_for_account_data(
self, sync_result_builder: "SyncResultBuilder"
- ) -> Dict[str, Dict[str, JsonDict]]:
- """Generates the account data portion of the sync response.
+ ) -> None:
+ """Generates the global account data portion of the sync response.
Account data (called "Client Config" in the spec) can be set either globally
or for a specific room. Account data consists of a list of events which
accumulate state, much like a room.
- This function retrieves global and per-room account data. The former is written
- to the given `sync_result_builder`. The latter is returned directly, to be
- later written to the `sync_result_builder` on a room-by-room basis.
+ This function retrieves global account data and writes it to the given
+ `sync_result_builder`. See `_generate_sync_entry_for_rooms` for handling
+ of per-room account data.
Args:
sync_result_builder
-
- Returns:
- A dictionary whose keys (room ids) map to the per room account data for that
- room.
"""
sync_config = sync_result_builder.sync_config
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
if since_token and not sync_result_builder.full_state:
- # TODO Do not fetch room account data if it will be unused.
- (
- global_account_data,
- account_data_by_room,
- ) = await self.store.get_updated_account_data_for_user(
- user_id, since_token.account_data_key
+ global_account_data = (
+ await self.store.get_updated_global_account_data_for_user(
+ user_id, since_token.account_data_key
+ )
)
push_rules_changed = await self.store.have_push_rules_changed_for_user(
@@ -1753,31 +1758,31 @@ class SyncHandler:
)
if push_rules_changed:
+ global_account_data = dict(global_account_data)
global_account_data["m.push_rules"] = await self.push_rules_for_user(
sync_config.user
)
else:
- # TODO Do not fetch room account data if it will be unused.
- (
- global_account_data,
- account_data_by_room,
- ) = await self.store.get_account_data_for_user(sync_config.user.to_string())
+ all_global_account_data = await self.store.get_global_account_data_for_user(
+ user_id
+ )
+ global_account_data = dict(all_global_account_data)
global_account_data["m.push_rules"] = await self.push_rules_for_user(
sync_config.user
)
- account_data_for_user = await sync_config.filter_collection.filter_account_data(
- [
- {"type": account_data_type, "content": content}
- for account_data_type, content in global_account_data.items()
- ]
+ account_data_for_user = (
+ await sync_config.filter_collection.filter_global_account_data(
+ [
+ {"type": account_data_type, "content": content}
+ for account_data_type, content in global_account_data.items()
+ ]
+ )
)
sync_result_builder.account_data = account_data_for_user
- return account_data_by_room
-
async def _generate_sync_entry_for_presence(
self,
sync_result_builder: "SyncResultBuilder",
@@ -1837,9 +1842,7 @@ class SyncHandler:
sync_result_builder.presence = presence
async def _generate_sync_entry_for_rooms(
- self,
- sync_result_builder: "SyncResultBuilder",
- account_data_by_room: Dict[str, Dict[str, JsonDict]],
+ self, sync_result_builder: "SyncResultBuilder"
) -> Tuple[AbstractSet[str], AbstractSet[str]]:
"""Generates the rooms portion of the sync response. Populates the
`sync_result_builder` with the result.
@@ -1850,7 +1853,6 @@ class SyncHandler:
Args:
sync_result_builder
- account_data_by_room: Dictionary of per room account data
Returns:
Returns a 2-tuple describing rooms the user has joined or left.
@@ -1863,9 +1865,30 @@ class SyncHandler:
since_token = sync_result_builder.since_token
user_id = sync_result_builder.sync_config.user.to_string()
+ blocks_all_rooms = (
+ sync_result_builder.sync_config.filter_collection.blocks_all_rooms()
+ )
+
+ # 0. Start by fetching room account data (if required).
+ if (
+ blocks_all_rooms
+ or sync_result_builder.sync_config.filter_collection.blocks_all_room_account_data()
+ ):
+ account_data_by_room: Mapping[str, Mapping[str, JsonDict]] = {}
+ elif since_token and not sync_result_builder.full_state:
+ account_data_by_room = (
+ await self.store.get_updated_room_account_data_for_user(
+ user_id, since_token.account_data_key
+ )
+ )
+ else:
+ account_data_by_room = await self.store.get_room_account_data_for_user(
+ user_id
+ )
+
# 1. Start by fetching all ephemeral events in rooms we've joined (if required).
block_all_room_ephemeral = (
- sync_result_builder.sync_config.filter_collection.blocks_all_rooms()
+ blocks_all_rooms
or sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral()
)
if block_all_room_ephemeral:
@@ -2291,8 +2314,8 @@ class SyncHandler:
sync_result_builder: "SyncResultBuilder",
room_builder: "RoomSyncResultBuilder",
ephemeral: List[JsonDict],
- tags: Optional[Dict[str, Dict[str, Any]]],
- account_data: Dict[str, JsonDict],
+ tags: Optional[Mapping[str, Mapping[str, Any]]],
+ account_data: Mapping[str, JsonDict],
always_include: bool = False,
) -> None:
"""Populates the `joined` and `archived` section of `sync_result_builder`
|