diff --git a/synapse/handlers/account.py b/synapse/handlers/account.py
index d5badf635b..c05a14304c 100644
--- a/synapse/handlers/account.py
+++ b/synapse/handlers/account.py
@@ -26,6 +26,10 @@ class AccountHandler:
self._main_store = hs.get_datastores().main
self._is_mine = hs.is_mine
self._federation_client = hs.get_federation_client()
+ self._use_account_validity_in_account_status = (
+ hs.config.server.use_account_validity_in_account_status
+ )
+ self._account_validity_handler = hs.get_account_validity_handler()
async def get_account_statuses(
self,
@@ -106,6 +110,13 @@ class AccountHandler:
"deactivated": userinfo.is_deactivated,
}
+ if self._use_account_validity_in_account_status:
+ status[
+ "org.matrix.expired"
+ ] = await self._account_validity_handler.is_user_expired(
+ user_id.to_string()
+ )
+
return status
async def _get_remote_account_statuses(
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index e6461cc3c9..bd913e524e 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -571,7 +571,7 @@ class ApplicationServicesHandler:
room_alias_str = room_alias.to_string()
services = self.store.get_app_services()
alias_query_services = [
- s for s in services if (s.is_interested_in_alias(room_alias_str))
+ s for s in services if (s.is_room_alias_in_namespace(room_alias_str))
]
for alias_service in alias_query_services:
is_known_alias = await self.appservice_api.query_alias(
@@ -660,7 +660,7 @@ class ApplicationServicesHandler:
# inside of a list comprehension anymore.
interested_list = []
for s in services:
- if await s.is_interested(event, self.store):
+ if await s.is_interested_in_event(event.event_id, event, self.store):
interested_list.append(s)
return interested_list
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 76ae768e6e..816e1a6d79 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -17,7 +17,7 @@ from typing import TYPE_CHECKING, Optional
from synapse.api.errors import SynapseError
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import Requester, UserID, create_requester
+from synapse.types import Codes, Requester, UserID, create_requester
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -42,6 +42,7 @@ class DeactivateAccountHandler:
# Flag that indicates whether the process to part users from rooms is running
self._user_parter_running = False
+ self._third_party_rules = hs.get_third_party_event_rules()
# Start the user parter loop so it can resume parting users from rooms where
# it left off (if it has work left to do).
@@ -74,6 +75,15 @@ class DeactivateAccountHandler:
Returns:
True if identity server supports removing threepids, otherwise False.
"""
+
+ # Check if this user can be deactivated
+ if not await self._third_party_rules.check_can_deactivate_user(
+ user_id, by_admin
+ ):
+ raise SynapseError(
+ 403, "Deactivation of this user is forbidden", Codes.FORBIDDEN
+ )
+
# FIXME: Theoretically there is a race here wherein user resets
# password using threepid.
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 934b5bd734..d5ccaa0c37 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -371,7 +371,6 @@ class DeviceHandler(DeviceWorkerHandler):
log_kv(
{"reason": "User doesn't have device id.", "device_id": device_id}
)
- pass
else:
raise
@@ -414,7 +413,6 @@ class DeviceHandler(DeviceWorkerHandler):
# no match
set_tag("error", True)
set_tag("reason", "User doesn't have that device id.")
- pass
else:
raise
@@ -506,7 +504,7 @@ class DeviceHandler(DeviceWorkerHandler):
"Sending device list update notif for %r to: %r", user_id, hosts
)
for host in hosts:
- self.federation_sender.send_device_messages(host)
+ self.federation_sender.send_device_messages(host, immediate=False)
log_kv({"message": "sent device update to host", "host": host})
async def notify_user_signature_update(
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index b7064c6624..33d827a45b 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -119,7 +119,7 @@ class DirectoryHandler:
service = requester.app_service
if service:
- if not service.is_interested_in_alias(room_alias_str):
+ if not service.is_room_alias_in_namespace(room_alias_str):
raise SynapseError(
400,
"This application service has not reserved this kind of alias.",
@@ -221,7 +221,7 @@ class DirectoryHandler:
async def delete_appservice_association(
self, service: ApplicationService, room_alias: RoomAlias
) -> None:
- if not service.is_interested_in_alias(room_alias.to_string()):
+ if not service.is_room_alias_in_namespace(room_alias.to_string()):
raise SynapseError(
400,
"This application service has not reserved this kind of alias",
@@ -376,7 +376,7 @@ class DirectoryHandler:
# non-exclusive locks on the alias (or there are no interested services)
services = self.store.get_app_services()
interested_services = [
- s for s in services if s.is_interested_in_alias(alias.to_string())
+ s for s in services if s.is_room_alias_in_namespace(alias.to_string())
]
for service in interested_services:
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 97e75e60c3..d2ccb5c5d3 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, Iterable, List, Optional
from synapse.api.constants import EduTypes, EventTypes, Membership
from synapse.api.errors import AuthError, SynapseError
from synapse.events import EventBase
+from synapse.events.utils import SerializeEventConfig
from synapse.handlers.presence import format_user_presence_state
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, UserID
@@ -120,7 +121,7 @@ class EventStreamHandler:
chunks = self._event_serializer.serialize_events(
events,
time_now,
- as_client_event=as_client_event,
+ config=SerializeEventConfig(as_client_event=as_client_event),
)
chunk = {
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index eb03a5accb..350ec9c03a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -23,8 +23,6 @@ from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
from unpaddedbase64 import decode_base64
-from twisted.internet import defer
-
from synapse import event_auth
from synapse.api.constants import EventContentFields, EventTypes, Membership
from synapse.api.errors import (
@@ -45,11 +43,7 @@ from synapse.events.snapshot import EventContext
from synapse.events.validator import EventValidator
from synapse.federation.federation_client import InvalidResponseError
from synapse.http.servlet import assert_params_in_dict
-from synapse.logging.context import (
- make_deferred_yieldable,
- nested_logging_context,
- preserve_fn,
-)
+from synapse.logging.context import nested_logging_context
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.federation import (
ReplicationCleanRoomRestServlet,
@@ -355,56 +349,8 @@ class FederationHandler:
if success:
return True
- # Huh, well *those* domains didn't work out. Lets try some domains
- # from the time.
-
- tried_domains = set(likely_domains)
- tried_domains.add(self.server_name)
-
- event_ids = list(extremities.keys())
-
- logger.debug("calling resolve_state_groups in _maybe_backfill")
- resolve = preserve_fn(self.state_handler.resolve_state_groups_for_events)
- states_list = await make_deferred_yieldable(
- defer.gatherResults(
- [resolve(room_id, [e]) for e in event_ids], consumeErrors=True
- )
- )
-
- # A map from event_id to state map of event_ids.
- state_ids: Dict[str, StateMap[str]] = dict(
- zip(event_ids, [s.state for s in states_list])
- )
-
- state_map = await self.store.get_events(
- [e_id for ids in state_ids.values() for e_id in ids.values()],
- get_prev_content=False,
- )
-
- # A map from event_id to state map of events.
- state_events: Dict[str, StateMap[EventBase]] = {
- key: {
- k: state_map[e_id]
- for k, e_id in state_dict.items()
- if e_id in state_map
- }
- for key, state_dict in state_ids.items()
- }
-
- for e_id in event_ids:
- likely_extremeties_domains = get_domains_from_state(state_events[e_id])
-
- success = await try_backfill(
- [
- dom
- for dom, _ in likely_extremeties_domains
- if dom not in tried_domains
- ]
- )
- if success:
- return True
-
- tried_domains.update(dom for dom, _ in likely_extremeties_domains)
+ # TODO: we could also try servers which were previously in the room, but
+ # are no longer.
return False
@@ -1004,54 +950,35 @@ class FederationHandler:
return event
- async def get_state_for_pdu(self, room_id: str, event_id: str) -> List[EventBase]:
- """Returns the state at the event. i.e. not including said event."""
-
- event = await self.store.get_event(event_id, check_room_id=room_id)
-
- state_groups = await self.state_store.get_state_groups(room_id, [event_id])
-
- if state_groups:
- _, state = list(state_groups.items()).pop()
- results = {(e.type, e.state_key): e for e in state}
-
- if event.is_state():
- # Get previous state
- if "replaces_state" in event.unsigned:
- prev_id = event.unsigned["replaces_state"]
- if prev_id != event.event_id:
- prev_event = await self.store.get_event(prev_id)
- results[(event.type, event.state_key)] = prev_event
- else:
- del results[(event.type, event.state_key)]
-
- res = list(results.values())
- return res
- else:
- return []
-
async def get_state_ids_for_pdu(self, room_id: str, event_id: str) -> List[str]:
"""Returns the state at the event. i.e. not including said event."""
event = await self.store.get_event(event_id, check_room_id=room_id)
+ if event.internal_metadata.outlier:
+ raise NotFoundError("State not known at event %s" % (event_id,))
state_groups = await self.state_store.get_state_groups_ids(room_id, [event_id])
- if state_groups:
- _, state = list(state_groups.items()).pop()
- results = state
+ # get_state_groups_ids should return exactly one result
+ assert len(state_groups) == 1
- if event.is_state():
- # Get previous state
- if "replaces_state" in event.unsigned:
- prev_id = event.unsigned["replaces_state"]
- if prev_id != event.event_id:
- results[(event.type, event.state_key)] = prev_id
- else:
- results.pop((event.type, event.state_key), None)
+ state_map = next(iter(state_groups.values()))
- return list(results.values())
- else:
- return []
+ state_key = event.get_state_key()
+ if state_key is not None:
+ # the event was not rejected (get_event raises a NotFoundError for rejected
+ # events) so the state at the event should include the event itself.
+ assert (
+ state_map.get((event.type, state_key)) == event.event_id
+ ), "State at event did not include event itself"
+
+ # ... but we need the state *before* that event
+ if "replaces_state" in event.unsigned:
+ prev_id = event.unsigned["replaces_state"]
+ state_map[(event.type, state_key)] = prev_id
+ else:
+ del state_map[(event.type, state_key)]
+
+ return list(state_map.values())
async def on_backfill_request(
self, origin: str, room_id: str, pdu_list: List[str], limit: int
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 344f20f37c..a7db8feb57 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -18,6 +18,7 @@ from typing import TYPE_CHECKING, List, Optional, Tuple, cast
from synapse.api.constants import EduTypes, EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.events import EventBase
+from synapse.events.utils import SerializeEventConfig
from synapse.events.validator import EventValidator
from synapse.handlers.presence import format_user_presence_state
from synapse.handlers.receipts import ReceiptEventSource
@@ -152,10 +153,13 @@ class InitialSyncHandler:
public_room_ids = await self.store.get_public_room_ids()
- limit = pagin_config.limit
- if limit is None:
+ if pagin_config.limit is not None:
+ limit = pagin_config.limit
+ else:
limit = 10
+ serializer_options = SerializeEventConfig(as_client_event=as_client_event)
+
async def handle_room(event: RoomsForUser) -> None:
d: JsonDict = {
"room_id": event.room_id,
@@ -173,7 +177,7 @@ class InitialSyncHandler:
d["invite"] = self._event_serializer.serialize_event(
invite_event,
time_now,
- as_client_event=as_client_event,
+ config=serializer_options,
)
rooms_ret.append(d)
@@ -225,7 +229,7 @@ class InitialSyncHandler:
self._event_serializer.serialize_events(
messages,
time_now=time_now,
- as_client_event=as_client_event,
+ config=serializer_options,
)
),
"start": await start_token.to_string(self.store),
@@ -235,7 +239,7 @@ class InitialSyncHandler:
d["state"] = self._event_serializer.serialize_events(
current_state.values(),
time_now=time_now,
- as_client_event=as_client_event,
+ config=serializer_options,
)
account_data_events = []
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 61cb133ef2..1c4fb4360a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -493,6 +493,7 @@ class EventCreationHandler:
allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
+ state_event_ids: Optional[List[str]] = None,
require_consent: bool = True,
outlier: bool = False,
historical: bool = False,
@@ -527,6 +528,15 @@ class EventCreationHandler:
If non-None, prev_event_ids must also be provided.
+ state_event_ids:
+ The full state at a given event. This is used particularly by the MSC2716
+ /batch_send endpoint. One use case is with insertion events which float at
+ the beginning of a historical batch and don't have any `prev_events` to
+ derive from; we add all of these state events as the explicit state so the
+ rest of the historical batch can inherit the same state and state_group.
+ This should normally be left as None, which will cause the auth_event_ids
+ to be calculated based on the room state at the prev_events.
+
require_consent: Whether to check if the requester has
consented to the privacy policy.
@@ -612,6 +622,7 @@ class EventCreationHandler:
allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
+ state_event_ids=state_event_ids,
depth=depth,
)
@@ -772,6 +783,7 @@ class EventCreationHandler:
allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
+ state_event_ids: Optional[List[str]] = None,
ratelimit: bool = True,
txn_id: Optional[str] = None,
ignore_shadow_ban: bool = False,
@@ -801,6 +813,14 @@ class EventCreationHandler:
based on the room state at the prev_events.
If non-None, prev_event_ids must also be provided.
+ state_event_ids:
+ The full state at a given event. This is used particularly by the MSC2716
+ /batch_send endpoint. One use case is with insertion events which float at
+ the beginning of a historical batch and don't have any `prev_events` to
+ derive from; we add all of these state events as the explicit state so the
+ rest of the historical batch can inherit the same state and state_group.
+ This should normally be left as None, which will cause the auth_event_ids
+ to be calculated based on the room state at the prev_events.
ratelimit: Whether to rate limit this send.
txn_id: The transaction ID.
ignore_shadow_ban: True if shadow-banned users should be allowed to
@@ -856,8 +876,10 @@ class EventCreationHandler:
requester,
event_dict,
txn_id=txn_id,
+ allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
+ state_event_ids=state_event_ids,
outlier=outlier,
historical=historical,
depth=depth,
@@ -893,6 +915,7 @@ class EventCreationHandler:
allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
+ state_event_ids: Optional[List[str]] = None,
depth: Optional[int] = None,
) -> Tuple[EventBase, EventContext]:
"""Create a new event for a local client
@@ -915,6 +938,15 @@ class EventCreationHandler:
Should normally be left as None, which will cause them to be calculated
based on the room state at the prev_events.
+ state_event_ids:
+ The full state at a given event. This is used particularly by the MSC2716
+ /batch_send endpoint. One use case is with insertion events which float at
+ the beginning of a historical batch and don't have any `prev_events` to
+ derive from; we add all of these state events as the explicit state so the
+ rest of the historical batch can inherit the same state and state_group.
+ This should normally be left as None, which will cause the auth_event_ids
+ to be calculated based on the room state at the prev_events.
+
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
@@ -922,31 +954,26 @@ class EventCreationHandler:
Returns:
Tuple of created event, context
"""
- # Strip down the auth_event_ids to only what we need to auth the event.
+ # Strip down the state_event_ids to only what we need to auth the event.
# For example, we don't need extra m.room.member that don't match event.sender
- full_state_ids_at_event = None
- if auth_event_ids is not None:
- # If auth events are provided, prev events must be also.
+ if state_event_ids is not None:
+ # Do a quick check to make sure that prev_event_ids is present to
+ # make the type-checking around `builder.build` happy.
# prev_event_ids could be an empty array though.
assert prev_event_ids is not None
- # Copy the full auth state before it stripped down
- full_state_ids_at_event = auth_event_ids.copy()
-
temp_event = await builder.build(
prev_event_ids=prev_event_ids,
- auth_event_ids=auth_event_ids,
+ auth_event_ids=state_event_ids,
depth=depth,
)
- auth_events = await self.store.get_events_as_list(auth_event_ids)
+ state_events = await self.store.get_events_as_list(state_event_ids)
# Create a StateMap[str]
- auth_event_state_map = {
- (e.type, e.state_key): e.event_id for e in auth_events
- }
- # Actually strip down and use the necessary auth events
+ state_map = {(e.type, e.state_key): e.event_id for e in state_events}
+ # Actually strip down and only use the necessary auth events
auth_event_ids = self._event_auth_handler.compute_auth_events(
event=temp_event,
- current_state_ids=auth_event_state_map,
+ current_state_ids=state_map,
for_verification=False,
)
@@ -989,12 +1016,16 @@ class EventCreationHandler:
context = EventContext.for_outlier()
elif (
event.type == EventTypes.MSC2716_INSERTION
- and full_state_ids_at_event
+ and state_event_ids
and builder.internal_metadata.is_historical()
):
+ # Add explicit state to the insertion event so it has state to derive
+ # from even though it's floating with no `prev_events`. The rest of
+ # the batch can derive from this state and state_group.
+ #
# TODO(faster_joins): figure out how this works, and make sure that the
# old state is complete.
- old_state = await self.store.get_events_as_list(full_state_ids_at_event)
+ old_state = await self.store.get_events_as_list(state_event_ids)
context = await self.state.compute_event_context(event, old_state=old_state)
else:
context = await self.state.compute_event_context(event)
@@ -1069,6 +1100,9 @@ class EventCreationHandler:
if relation_type == RelationTypes.ANNOTATION:
aggregation_key = relation["key"]
+ if len(aggregation_key) > 500:
+ raise SynapseError(400, "Aggregation key is too long")
+
already_exists = await self.store.has_user_annotated_event(
relates_to, event.type, aggregation_key, event.sender
)
@@ -1076,7 +1110,10 @@ class EventCreationHandler:
raise SynapseError(400, "Can't send same reaction twice")
# Don't attempt to start a thread if the parent event is a relation.
- elif relation_type == RelationTypes.THREAD:
+ elif (
+ relation_type == RelationTypes.THREAD
+ or relation_type == RelationTypes.UNSTABLE_THREAD
+ ):
if await self.store.event_includes_relation(relates_to):
raise SynapseError(
400, "Cannot start threads from an event with a relation"
diff --git a/synapse/handlers/oidc.py b/synapse/handlers/oidc.py
index 593a2aac66..724b9cfcb4 100644
--- a/synapse/handlers/oidc.py
+++ b/synapse/handlers/oidc.py
@@ -45,6 +45,7 @@ from synapse.types import JsonDict, UserID, map_username_to_mxid_localpart
from synapse.util import Clock, json_decoder
from synapse.util.caches.cached_call import RetryOnExceptionCachedCall
from synapse.util.macaroons import get_value_from_macaroon, satisfy_expiry
+from synapse.util.templates import _localpart_from_email_filter
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -1228,6 +1229,7 @@ class OidcSessionData:
class UserAttributeDict(TypedDict):
localpart: Optional[str]
+ confirm_localpart: bool
display_name: Optional[str]
emails: List[str]
@@ -1307,6 +1309,11 @@ def jinja_finalize(thing: Any) -> Any:
env = Environment(finalize=jinja_finalize)
+env.filters.update(
+ {
+ "localpart_from_email": _localpart_from_email_filter,
+ }
+)
@attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -1316,6 +1323,7 @@ class JinjaOidcMappingConfig:
display_name_template: Optional[Template]
email_template: Optional[Template]
extra_attributes: Dict[str, Template]
+ confirm_localpart: bool = False
class JinjaOidcMappingProvider(OidcMappingProvider[JinjaOidcMappingConfig]):
@@ -1357,12 +1365,17 @@ class JinjaOidcMappingProvider(OidcMappingProvider[JinjaOidcMappingConfig]):
"invalid jinja template", path=["extra_attributes", key]
) from e
+ confirm_localpart = config.get("confirm_localpart") or False
+ if not isinstance(confirm_localpart, bool):
+ raise ConfigError("must be a bool", path=["confirm_localpart"])
+
return JinjaOidcMappingConfig(
subject_claim=subject_claim,
localpart_template=localpart_template,
display_name_template=display_name_template,
email_template=email_template,
extra_attributes=extra_attributes,
+ confirm_localpart=confirm_localpart,
)
def get_remote_user_id(self, userinfo: UserInfo) -> str:
@@ -1398,7 +1411,10 @@ class JinjaOidcMappingProvider(OidcMappingProvider[JinjaOidcMappingConfig]):
emails.append(email)
return UserAttributeDict(
- localpart=localpart, display_name=display_name, emails=emails
+ localpart=localpart,
+ display_name=display_name,
+ emails=emails,
+ confirm_localpart=self._config.confirm_localpart,
)
async def get_extra_attributes(self, userinfo: UserInfo, token: Token) -> JsonDict:
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 5c01a426ff..876b879483 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, Any, Collection, Dict, List, Optional, Set
+from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Set
import attr
@@ -22,6 +22,7 @@ from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
+from synapse.events.utils import SerializeEventConfig
from synapse.handlers.room import ShutdownRoomResponse
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter
@@ -133,6 +134,7 @@ class PaginationHandler:
self.clock = hs.get_clock()
self._server_name = hs.hostname
self._room_shutdown_handler = hs.get_room_shutdown_handler()
+ self._relations_handler = hs.get_relations_handler()
self.pagination_lock = ReadWriteLock()
# IDs of rooms in which there currently an active purge *or delete* operation.
@@ -349,7 +351,7 @@ class PaginationHandler:
"""
self._purges_in_progress_by_room.add(room_id)
try:
- with await self.pagination_lock.write(room_id):
+ async with self.pagination_lock.write(room_id):
await self.storage.purge_events.purge_history(
room_id, token, delete_local_events
)
@@ -405,7 +407,7 @@ class PaginationHandler:
room_id: room to be purged
force: set true to skip checking for joined users.
"""
- with await self.pagination_lock.write(room_id):
+ async with self.pagination_lock.write(room_id):
# first check that we have no users in this room
if not force:
joined = await self.store.is_host_joined(room_id, self._server_name)
@@ -421,7 +423,7 @@ class PaginationHandler:
pagin_config: PaginationConfig,
as_client_event: bool = True,
event_filter: Optional[Filter] = None,
- ) -> Dict[str, Any]:
+ ) -> JsonDict:
"""Get messages in a room.
Args:
@@ -430,6 +432,7 @@ class PaginationHandler:
pagin_config: The pagination config rules to apply, if any.
as_client_event: True to get events in client-server format.
event_filter: Filter to apply to results or None
+
Returns:
Pagination API results
"""
@@ -447,7 +450,7 @@ class PaginationHandler:
room_token = from_token.room_key
- with await self.pagination_lock.read(room_id):
+ async with self.pagination_lock.read(room_id):
(
membership,
member_event_id,
@@ -537,17 +540,21 @@ class PaginationHandler:
state_dict = await self.store.get_events(list(state_ids.values()))
state = state_dict.values()
- aggregations = await self.store.get_bundled_aggregations(events, user_id)
+ aggregations = await self._relations_handler.get_bundled_aggregations(
+ events, user_id
+ )
time_now = self.clock.time_msec()
+ serialize_options = SerializeEventConfig(as_client_event=as_client_event)
+
chunk = {
"chunk": (
self._event_serializer.serialize_events(
events,
time_now,
+ config=serialize_options,
bundle_aggregations=aggregations,
- as_client_event=as_client_event,
)
),
"start": await from_token.to_string(self.store),
@@ -556,7 +563,7 @@ class PaginationHandler:
if state:
chunk["state"] = self._event_serializer.serialize_events(
- state, time_now, as_client_event=as_client_event
+ state, time_now, config=serialize_options
)
return chunk
@@ -612,7 +619,7 @@ class PaginationHandler:
self._purges_in_progress_by_room.add(room_id)
try:
- with await self.pagination_lock.write(room_id):
+ async with self.pagination_lock.write(room_id):
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
self._delete_by_id[
delete_id
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index c155098bee..34d9411bbf 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -267,7 +267,6 @@ class BasePresenceHandler(abc.ABC):
is_syncing: Whether or not the user is now syncing
sync_time_msec: Time in ms when the user was last syncing
"""
- pass
async def update_external_syncs_clear(self, process_id: str) -> None:
"""Marks all users that had been marked as syncing by a given process
@@ -277,7 +276,6 @@ class BasePresenceHandler(abc.ABC):
This is a no-op when presence is handled by a different worker.
"""
- pass
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: list
@@ -424,13 +422,13 @@ class WorkerPresenceHandler(BasePresenceHandler):
async def _on_shutdown(self) -> None:
if self._presence_enabled:
- self.hs.get_tcp_replication().send_command(
+ self.hs.get_replication_command_handler().send_command(
ClearUserSyncsCommand(self.instance_id)
)
def send_user_sync(self, user_id: str, is_syncing: bool, last_sync_ms: int) -> None:
if self._presence_enabled:
- self.hs.get_tcp_replication().send_user_sync(
+ self.hs.get_replication_command_handler().send_user_sync(
self.instance_id, user_id, is_syncing, last_sync_ms
)
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 6554c0d3c2..239b0aa744 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -336,12 +336,18 @@ class ProfileHandler:
"""Check that the size and content type of the avatar at the given MXC URI are
within the configured limits.
+ If the given `mxc` is empty, no checks are performed. (Users are always able to
+ unset their avatar.)
+
Args:
mxc: The MXC URI at which the avatar can be found.
Returns:
A boolean indicating whether the file can be allowed to be set as an avatar.
"""
+ if mxc == "":
+ return True
+
if not self.max_avatar_size and not self.allowed_avatar_mimetypes:
return True
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index b4132c353a..6250bb3bdf 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -269,7 +269,7 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
# Then filter down to rooms that the AS can read
events = []
for room_id, event in rooms_to_events.items():
- if not await service.matches_user_in_member_list(room_id, self.store):
+ if not await service.is_interested_in_room(room_id, self.store):
continue
events.append(event)
diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py
new file mode 100644
index 0000000000..73217d135d
--- /dev/null
+++ b/synapse/handlers/relations.py
@@ -0,0 +1,271 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+from typing import TYPE_CHECKING, Dict, Iterable, Optional, cast
+
+import attr
+from frozendict import frozendict
+
+from synapse.api.constants import RelationTypes
+from synapse.api.errors import SynapseError
+from synapse.events import EventBase
+from synapse.types import JsonDict, Requester, StreamToken
+from synapse.visibility import filter_events_for_client
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+ from synapse.storage.databases.main import DataStore
+
+
+logger = logging.getLogger(__name__)
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class _ThreadAggregation:
+ # The latest event in the thread.
+ latest_event: EventBase
+ # The latest edit to the latest event in the thread.
+ latest_edit: Optional[EventBase]
+ # The total number of events in the thread.
+ count: int
+ # True if the current user has sent an event to the thread.
+ current_user_participated: bool
+
+
+@attr.s(slots=True, auto_attribs=True)
+class BundledAggregations:
+ """
+ The bundled aggregations for an event.
+
+ Some values require additional processing during serialization.
+ """
+
+ annotations: Optional[JsonDict] = None
+ references: Optional[JsonDict] = None
+ replace: Optional[EventBase] = None
+ thread: Optional[_ThreadAggregation] = None
+
+ def __bool__(self) -> bool:
+ return bool(self.annotations or self.references or self.replace or self.thread)
+
+
+class RelationsHandler:
+ def __init__(self, hs: "HomeServer"):
+ self._main_store = hs.get_datastores().main
+ self._storage = hs.get_storage()
+ self._auth = hs.get_auth()
+ self._clock = hs.get_clock()
+ self._event_handler = hs.get_event_handler()
+ self._event_serializer = hs.get_event_client_serializer()
+
+ async def get_relations(
+ self,
+ requester: Requester,
+ event_id: str,
+ room_id: str,
+ relation_type: Optional[str] = None,
+ event_type: Optional[str] = None,
+ aggregation_key: Optional[str] = None,
+ limit: int = 5,
+ direction: str = "b",
+ from_token: Optional[StreamToken] = None,
+ to_token: Optional[StreamToken] = None,
+ ) -> JsonDict:
+ """Get related events of a event, ordered by topological ordering.
+
+ TODO Accept a PaginationConfig instead of individual pagination parameters.
+
+ Args:
+ requester: The user requesting the relations.
+ event_id: Fetch events that relate to this event ID.
+ room_id: The room the event belongs to.
+ relation_type: Only fetch events with this relation type, if given.
+ event_type: Only fetch events with this event type, if given.
+ aggregation_key: Only fetch events with this aggregation key, if given.
+ limit: Only fetch the most recent `limit` events.
+ direction: Whether to fetch the most recent first (`"b"`) or the
+ oldest first (`"f"`).
+ from_token: Fetch rows from the given token, or from the start if None.
+ to_token: Fetch rows up to the given token, or up to the end if None.
+
+ Returns:
+ The pagination chunk.
+ """
+
+ user_id = requester.user.to_string()
+
+ # TODO Properly handle a user leaving a room.
+ (_, member_event_id) = await self._auth.check_user_in_room_or_world_readable(
+ room_id, user_id, allow_departed_users=True
+ )
+
+ # This gets the original event and checks that a) the event exists and
+ # b) the user is allowed to view it.
+ event = await self._event_handler.get_event(requester.user, room_id, event_id)
+ if event is None:
+ raise SynapseError(404, "Unknown parent event.")
+
+ pagination_chunk = await self._main_store.get_relations_for_event(
+ event_id=event_id,
+ event=event,
+ room_id=room_id,
+ relation_type=relation_type,
+ event_type=event_type,
+ aggregation_key=aggregation_key,
+ limit=limit,
+ direction=direction,
+ from_token=from_token,
+ to_token=to_token,
+ )
+
+ events = await self._main_store.get_events_as_list(
+ [c["event_id"] for c in pagination_chunk.chunk]
+ )
+
+ events = await filter_events_for_client(
+ self._storage, user_id, events, is_peeking=(member_event_id is None)
+ )
+
+ now = self._clock.time_msec()
+ # Do not bundle aggregations when retrieving the original event because
+ # we want the content before relations are applied to it.
+ original_event = self._event_serializer.serialize_event(
+ event, now, bundle_aggregations=None
+ )
+ # The relations returned for the requested event do include their
+ # bundled aggregations.
+ aggregations = await self.get_bundled_aggregations(
+ events, requester.user.to_string()
+ )
+ serialized_events = self._event_serializer.serialize_events(
+ events, now, bundle_aggregations=aggregations
+ )
+
+ return_value = await pagination_chunk.to_dict(self._main_store)
+ return_value["chunk"] = serialized_events
+ return_value["original_event"] = original_event
+
+ return return_value
+
+ async def _get_bundled_aggregation_for_event(
+ self, event: EventBase, user_id: str
+ ) -> Optional[BundledAggregations]:
+ """Generate bundled aggregations for an event.
+
+ Note that this does not use a cache, but depends on cached methods.
+
+ Args:
+ event: The event to calculate bundled aggregations for.
+ user_id: The user requesting the bundled aggregations.
+
+ Returns:
+ The bundled aggregations for an event, if bundled aggregations are
+ enabled and the event can have bundled aggregations.
+ """
+
+ # Do not bundle aggregations for an event which represents an edit or an
+ # annotation. It does not make sense for them to have related events.
+ relates_to = event.content.get("m.relates_to")
+ if isinstance(relates_to, (dict, frozendict)):
+ relation_type = relates_to.get("rel_type")
+ if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE):
+ return None
+
+ event_id = event.event_id
+ room_id = event.room_id
+
+ # The bundled aggregations to include, a mapping of relation type to a
+ # type-specific value. Some types include the direct return type here
+ # while others need more processing during serialization.
+ aggregations = BundledAggregations()
+
+ annotations = await self._main_store.get_aggregation_groups_for_event(
+ event_id, room_id
+ )
+ if annotations.chunk:
+ aggregations.annotations = await annotations.to_dict(
+ cast("DataStore", self)
+ )
+
+ references = await self._main_store.get_relations_for_event(
+ event_id, event, room_id, RelationTypes.REFERENCE, direction="f"
+ )
+ if references.chunk:
+ aggregations.references = await references.to_dict(cast("DataStore", self))
+
+ # Store the bundled aggregations in the event metadata for later use.
+ return aggregations
+
+ async def get_bundled_aggregations(
+ self, events: Iterable[EventBase], user_id: str
+ ) -> Dict[str, BundledAggregations]:
+ """Generate bundled aggregations for events.
+
+ Args:
+ events: The iterable of events to calculate bundled aggregations for.
+ user_id: The user requesting the bundled aggregations.
+
+ Returns:
+ A map of event ID to the bundled aggregation for the event. Not all
+ events may have bundled aggregations in the results.
+ """
+ # De-duplicate events by ID to handle the same event requested multiple times.
+ #
+ # State events do not get bundled aggregations.
+ events_by_id = {
+ event.event_id: event for event in events if not event.is_state()
+ }
+
+ # event ID -> bundled aggregation in non-serialized form.
+ results: Dict[str, BundledAggregations] = {}
+
+ # Fetch other relations per event.
+ for event in events_by_id.values():
+ event_result = await self._get_bundled_aggregation_for_event(event, user_id)
+ if event_result:
+ results[event.event_id] = event_result
+
+ # Fetch any edits (but not for redacted events).
+ edits = await self._main_store.get_applicable_edits(
+ [
+ event_id
+ for event_id, event in events_by_id.items()
+ if not event.internal_metadata.is_redacted()
+ ]
+ )
+ for event_id, edit in edits.items():
+ results.setdefault(event_id, BundledAggregations()).replace = edit
+
+ # Fetch thread summaries.
+ summaries = await self._main_store.get_thread_summaries(events_by_id.keys())
+ # Only fetch participated for a limited selection based on what had
+ # summaries.
+ participated = await self._main_store.get_threads_participated(
+ [event_id for event_id, summary in summaries.items() if summary], user_id
+ )
+ for event_id, summary in summaries.items():
+ if summary:
+ thread_count, latest_thread_event, edit = summary
+ results.setdefault(
+ event_id, BundledAggregations()
+ ).thread = _ThreadAggregation(
+ latest_event=latest_thread_event,
+ latest_edit=edit,
+ count=thread_count,
+ # If there's a thread summary it must also exist in the
+ # participated dictionary.
+ current_user_participated=participated[event_id],
+ )
+
+ return results
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 7b965b4b96..092e185c99 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -60,8 +60,8 @@ from synapse.events import EventBase
from synapse.events.utils import copy_power_levels_contents
from synapse.federation.federation_client import InvalidResponseError
from synapse.handlers.federation import get_domains_from_state
+from synapse.handlers.relations import BundledAggregations
from synapse.rest.admin._base import assert_user_is_admin
-from synapse.storage.databases.main.relations import BundledAggregations
from synapse.storage.state import StateFilter
from synapse.streams import EventSource
from synapse.types import (
@@ -1118,6 +1118,7 @@ class RoomContextHandler:
self.store = hs.get_datastores().main
self.storage = hs.get_storage()
self.state_store = self.storage.state
+ self._relations_handler = hs.get_relations_handler()
async def get_event_context(
self,
@@ -1190,7 +1191,7 @@ class RoomContextHandler:
event = filtered[0]
# Fetch the aggregations.
- aggregations = await self.store.get_bundled_aggregations(
+ aggregations = await self._relations_handler.get_bundled_aggregations(
itertools.chain(events_before, (event,), events_after),
user.to_string(),
)
@@ -1475,6 +1476,7 @@ class RoomShutdownHandler:
self.room_member_handler = hs.get_room_member_handler()
self._room_creation_handler = hs.get_room_creation_handler()
self._replication = hs.get_replication_data_handler()
+ self._third_party_rules = hs.get_third_party_event_rules()
self.event_creation_handler = hs.get_event_creation_handler()
self.store = hs.get_datastores().main
@@ -1548,6 +1550,13 @@ class RoomShutdownHandler:
if not RoomID.is_valid(room_id):
raise SynapseError(400, "%s is not a legal room ID" % (room_id,))
+ if not await self._third_party_rules.check_can_shutdown_room(
+ requester_user_id, room_id
+ ):
+ raise SynapseError(
+ 403, "Shutdown of this room is forbidden", Codes.FORBIDDEN
+ )
+
# Action the block first (even if the room doesn't exist yet)
if block:
# This will work even if the room is already blocked, but that is
diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py
index abbf7b7b27..a0255bd143 100644
--- a/synapse/handlers/room_batch.py
+++ b/synapse/handlers/room_batch.py
@@ -121,12 +121,11 @@ class RoomBatchHandler:
return create_requester(user_id, app_service=app_service)
- async def get_most_recent_auth_event_ids_from_event_id_list(
+ async def get_most_recent_full_state_ids_from_event_id_list(
self, event_ids: List[str]
) -> List[str]:
- """Find the most recent auth event ids (derived from state events) that
- allowed that message to be sent. We will use this as a base
- to auth our historical messages against.
+ """Find the most recent event_id and grab the full state at that event.
+ We will use this as a base to auth our historical messages against.
Args:
event_ids: List of event ID's to look at
@@ -136,38 +135,37 @@ class RoomBatchHandler:
"""
(
- most_recent_prev_event_id,
+ most_recent_event_id,
_,
) = await self.store.get_max_depth_of(event_ids)
# mapping from (type, state_key) -> state_event_id
prev_state_map = await self.state_store.get_state_ids_for_event(
- most_recent_prev_event_id
+ most_recent_event_id
)
# List of state event ID's
- prev_state_ids = list(prev_state_map.values())
- auth_event_ids = prev_state_ids
+ full_state_ids = list(prev_state_map.values())
- return auth_event_ids
+ return full_state_ids
async def persist_state_events_at_start(
self,
state_events_at_start: List[JsonDict],
room_id: str,
- initial_auth_event_ids: List[str],
+ initial_state_event_ids: List[str],
app_service_requester: Requester,
) -> List[str]:
"""Takes all `state_events_at_start` event dictionaries and creates/persists
- them as floating state events which don't resolve into the current room state.
- They are floating because they reference a fake prev_event which doesn't connect
- to the normal DAG at all.
+ them in a floating state event chain which don't resolve into the current room
+ state. They are floating because they reference no prev_events and are marked
+ as outliers which disconnects them from the normal DAG.
Args:
state_events_at_start:
room_id: Room where you want the events persisted in.
- initial_auth_event_ids: These will be the auth_events for the first
- state event created. Each event created afterwards will be
- added to the list of auth events for the next state event
- created.
+ initial_state_event_ids:
+ The base set of state for the historical batch which the floating
+ state chain will derive from. This should probably be the state
+ from the `prev_event` defined by `/batch_send?prev_event_id=$abc`.
app_service_requester: The requester of an application service.
Returns:
@@ -176,7 +174,7 @@ class RoomBatchHandler:
assert app_service_requester.app_service
state_event_ids_at_start = []
- auth_event_ids = initial_auth_event_ids.copy()
+ state_event_ids = initial_state_event_ids.copy()
# Make the state events float off on their own by specifying no
# prev_events for the first one in the chain so we don't have a bunch of
@@ -189,9 +187,7 @@ class RoomBatchHandler:
)
logger.debug(
- "RoomBatchSendEventRestServlet inserting state_event=%s, auth_event_ids=%s",
- state_event,
- auth_event_ids,
+ "RoomBatchSendEventRestServlet inserting state_event=%s", state_event
)
event_dict = {
@@ -217,16 +213,26 @@ class RoomBatchHandler:
room_id=room_id,
action=membership,
content=event_dict["content"],
+ # Mark as an outlier to disconnect it from the normal DAG
+ # and not show up between batches of history.
outlier=True,
historical=True,
- # Only the first event in the chain should be floating.
+ # Only the first event in the state chain should be floating.
# The rest should hang off each other in a chain.
allow_no_prev_events=index == 0,
prev_event_ids=prev_event_ids_for_state_chain,
+ # Since each state event is marked as an outlier, the
+ # `EventContext.for_outlier()` won't have any `state_ids`
+ # set and therefore can't derive any state even though the
+ # prev_events are set. Also since the first event in the
+ # state chain is floating with no `prev_events`, it can't
+ # derive state from anywhere automatically. So we need to
+ # set some state explicitly.
+ #
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
# reference and also update in the event when we append later.
- auth_event_ids=auth_event_ids.copy(),
+ state_event_ids=state_event_ids.copy(),
)
else:
# TODO: Add some complement tests that adds state that is not member joins
@@ -240,21 +246,31 @@ class RoomBatchHandler:
state_event["sender"], app_service_requester.app_service
),
event_dict,
+ # Mark as an outlier to disconnect it from the normal DAG
+ # and not show up between batches of history.
outlier=True,
historical=True,
- # Only the first event in the chain should be floating.
+ # Only the first event in the state chain should be floating.
# The rest should hang off each other in a chain.
allow_no_prev_events=index == 0,
prev_event_ids=prev_event_ids_for_state_chain,
+ # Since each state event is marked as an outlier, the
+ # `EventContext.for_outlier()` won't have any `state_ids`
+ # set and therefore can't derive any state even though the
+ # prev_events are set. Also since the first event in the
+ # state chain is floating with no `prev_events`, it can't
+ # derive state from anywhere automatically. So we need to
+ # set some state explicitly.
+ #
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
# reference and also update in the event when we append later.
- auth_event_ids=auth_event_ids.copy(),
+ state_event_ids=state_event_ids.copy(),
)
event_id = event.event_id
state_event_ids_at_start.append(event_id)
- auth_event_ids.append(event_id)
+ state_event_ids.append(event_id)
# Connect all the state in a floating chain
prev_event_ids_for_state_chain = [event_id]
@@ -265,7 +281,7 @@ class RoomBatchHandler:
events_to_create: List[JsonDict],
room_id: str,
inherited_depth: int,
- auth_event_ids: List[str],
+ initial_state_event_ids: List[str],
app_service_requester: Requester,
) -> List[str]:
"""Create and persists all events provided sequentially. Handles the
@@ -281,8 +297,10 @@ class RoomBatchHandler:
room_id: Room where you want the events persisted in.
inherited_depth: The depth to create the events at (you will
probably by calling inherit_depth_from_prev_ids(...)).
- auth_event_ids: Define which events allow you to create the given
- event in the room.
+ initial_state_event_ids:
+ This is used to set explicit state for the insertion event at
+ the start of the historical batch since it's floating with no
+ prev_events to derive state from automatically.
app_service_requester: The requester of an application service.
Returns:
@@ -290,6 +308,11 @@ class RoomBatchHandler:
"""
assert app_service_requester.app_service
+ # We expect the first event in a historical batch to be an insertion event
+ assert events_to_create[0]["type"] == EventTypes.MSC2716_INSERTION
+ # We expect the last event in a historical batch to be an batch event
+ assert events_to_create[-1]["type"] == EventTypes.MSC2716_BATCH
+
# Make the historical event chain float off on its own by specifying no
# prev_events for the first event in the chain which causes the HS to
# ask for the state at the start of the batch later.
@@ -321,11 +344,16 @@ class RoomBatchHandler:
ev["sender"], app_service_requester.app_service
),
event_dict,
- # Only the first event in the chain should be floating.
- # The rest should hang off each other in a chain.
+ # Only the first event (which is the insertion event) in the
+ # chain should be floating. The rest should hang off each other
+ # in a chain.
allow_no_prev_events=index == 0,
prev_event_ids=event_dict.get("prev_events"),
- auth_event_ids=auth_event_ids,
+ # Since the first event (which is the insertion event) in the
+ # chain is floating with no `prev_events`, it can't derive state
+ # from anywhere automatically. So we need to set some state
+ # explicitly.
+ state_event_ids=initial_state_event_ids if index == 0 else None,
historical=True,
depth=inherited_depth,
)
@@ -343,10 +371,9 @@ class RoomBatchHandler:
)
logger.debug(
- "RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s, auth_event_ids=%s",
+ "RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s",
event,
prev_event_ids,
- auth_event_ids,
)
events_to_persist.append((event, context))
@@ -376,12 +403,12 @@ class RoomBatchHandler:
room_id: str,
batch_id_to_connect_to: str,
inherited_depth: int,
- auth_event_ids: List[str],
+ initial_state_event_ids: List[str],
app_service_requester: Requester,
) -> Tuple[List[str], str]:
"""
- Handles creating and persisting all of the historical events as well
- as insertion and batch meta events to make the batch navigable in the DAG.
+ Handles creating and persisting all of the historical events as well as
+ insertion and batch meta events to make the batch navigable in the DAG.
Args:
events_to_create: List of historical events to create in JSON
@@ -391,8 +418,13 @@ class RoomBatchHandler:
want this batch to connect to.
inherited_depth: The depth to create the events at (you will
probably by calling inherit_depth_from_prev_ids(...)).
- auth_event_ids: Define which events allow you to create the given
- event in the room.
+ initial_state_event_ids:
+ This is used to set explicit state for the insertion event at
+ the start of the historical batch since it's floating with no
+ prev_events to derive state from automatically. This should
+ probably be the state from the `prev_event` defined by
+ `/batch_send?prev_event_id=$abc` plus the outcome of
+ `persist_state_events_at_start`
app_service_requester: The requester of an application service.
Returns:
@@ -438,7 +470,7 @@ class RoomBatchHandler:
events_to_create=events_to_create,
room_id=room_id,
inherited_depth=inherited_depth,
- auth_event_ids=auth_event_ids,
+ initial_state_event_ids=initial_state_event_ids,
app_service_requester=app_service_requester,
)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index a582837cf0..a33fa34aa8 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -272,6 +272,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
+ state_event_ids: Optional[List[str]] = None,
txn_id: Optional[str] = None,
ratelimit: bool = True,
content: Optional[dict] = None,
@@ -298,6 +299,14 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
The event ids to use as the auth_events for the new event.
Should normally be left as None, which will cause them to be calculated
based on the room state at the prev_events.
+ state_event_ids:
+ The full state at a given event. This is used particularly by the MSC2716
+ /batch_send endpoint. One use case is the historical `state_events_at_start`;
+ since each is marked as an `outlier`, the `EventContext.for_outlier()` won't
+ have any `state_ids` set and therefore can't derive any state even though the
+ prev_events are set so we need to set them ourself via this argument.
+ This should normally be left as None, which will cause the auth_event_ids
+ to be calculated based on the room state at the prev_events.
txn_id:
ratelimit:
@@ -353,6 +362,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
+ state_event_ids=state_event_ids,
require_consent=require_consent,
outlier=outlier,
historical=historical,
@@ -456,6 +466,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
+ state_event_ids: Optional[List[str]] = None,
) -> Tuple[str, int]:
"""Update a user's membership in a room.
@@ -487,6 +498,14 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
The event ids to use as the auth_events for the new event.
Should normally be left as None, which will cause them to be calculated
based on the room state at the prev_events.
+ state_event_ids:
+ The full state at a given event. This is used particularly by the MSC2716
+ /batch_send endpoint. One use case is the historical `state_events_at_start`;
+ since each is marked as an `outlier`, the `EventContext.for_outlier()` won't
+ have any `state_ids` set and therefore can't derive any state even though the
+ prev_events are set so we need to set them ourself via this argument.
+ This should normally be left as None, which will cause the auth_event_ids
+ to be calculated based on the room state at the prev_events.
Returns:
A tuple of the new event ID and stream ID.
@@ -526,6 +545,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
+ state_event_ids=state_event_ids,
)
return result
@@ -548,6 +568,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
+ state_event_ids: Optional[List[str]] = None,
) -> Tuple[str, int]:
"""Helper for update_membership.
@@ -581,6 +602,14 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
The event ids to use as the auth_events for the new event.
Should normally be left as None, which will cause them to be calculated
based on the room state at the prev_events.
+ state_event_ids:
+ The full state at a given event. This is used particularly by the MSC2716
+ /batch_send endpoint. One use case is the historical `state_events_at_start`;
+ since each is marked as an `outlier`, the `EventContext.for_outlier()` won't
+ have any `state_ids` set and therefore can't derive any state even though the
+ prev_events are set so we need to set them ourself via this argument.
+ This should normally be left as None, which will cause the auth_event_ids
+ to be calculated based on the room state at the prev_events.
Returns:
A tuple of the new event ID and stream ID.
@@ -708,6 +737,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
+ state_event_ids=state_event_ids,
content=content,
require_consent=require_consent,
outlier=outlier,
@@ -932,6 +962,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
ratelimit=ratelimit,
prev_event_ids=latest_event_ids,
auth_event_ids=auth_event_ids,
+ state_event_ids=state_event_ids,
content=content,
require_consent=require_consent,
outlier=outlier,
@@ -1736,8 +1767,8 @@ class RoomMemberMasterHandler(RoomMemberHandler):
txn_id=txn_id,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
+ outlier=True,
)
- event.internal_metadata.outlier = True
event.internal_metadata.out_of_band_membership = True
result_event = await self.event_creation_handler.handle_new_client_event(
diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py
index 3979cbba71..486145f48a 100644
--- a/synapse/handlers/room_summary.py
+++ b/synapse/handlers/room_summary.py
@@ -295,7 +295,7 @@ class RoomSummaryHandler:
# inaccessible to the requesting user.
if room_entry:
# Add the room (including the stripped m.space.child events).
- rooms_result.append(room_entry.as_json())
+ rooms_result.append(room_entry.as_json(for_client=True))
# If this room is not at the max-depth, check if there are any
# children to process.
@@ -843,14 +843,25 @@ class _RoomEntry:
# This may not include all children.
children_state_events: Sequence[JsonDict] = ()
- def as_json(self) -> JsonDict:
+ def as_json(self, for_client: bool = False) -> JsonDict:
"""
Returns a JSON dictionary suitable for the room hierarchy endpoint.
It returns the room summary including the stripped m.space.child events
as a sub-key.
+
+ Args:
+ for_client: If true, any server-server only fields are stripped from
+ the result.
+
"""
result = dict(self.room)
+
+ # Before returning to the client, remove the allowed_room_ids key, if it
+ # exists.
+ if for_client:
+ result.pop("allowed_room_ids", False)
+
result["children_state"] = self.children_state_events
return result
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index aa16e417eb..30eddda65f 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -54,6 +54,7 @@ class SearchHandler:
self.clock = hs.get_clock()
self.hs = hs
self._event_serializer = hs.get_event_client_serializer()
+ self._relations_handler = hs.get_relations_handler()
self.storage = hs.get_storage()
self.state_store = self.storage.state
self.auth = hs.get_auth()
@@ -354,7 +355,7 @@ class SearchHandler:
aggregations = None
if self._msc3666_enabled:
- aggregations = await self.store.get_bundled_aggregations(
+ aggregations = await self._relations_handler.get_bundled_aggregations(
# Generate an iterable of EventBase for all the events that will be
# returned, including contextual events.
itertools.chain(
diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py
index ff5b5169ca..4f02a060d9 100644
--- a/synapse/handlers/sso.py
+++ b/synapse/handlers/sso.py
@@ -132,6 +132,7 @@ class UserAttributes:
# if `None`, the mapper has not picked a userid, and the user should be prompted to
# enter one.
localpart: Optional[str]
+ confirm_localpart: bool = False
display_name: Optional[str] = None
emails: Collection[str] = attr.Factory(list)
@@ -561,9 +562,10 @@ class SsoHandler:
# Must provide either attributes or session, not both
assert (attributes is not None) != (session is not None)
- if (attributes and attributes.localpart is None) or (
- session and session.chosen_localpart is None
- ):
+ if (
+ attributes
+ and (attributes.localpart is None or attributes.confirm_localpart is True)
+ ) or (session and session.chosen_localpart is None):
return b"/_synapse/client/pick_username/account_details"
elif self._consent_at_registration and not (
session and session.terms_accepted_version
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 0aa3052fd6..6c569cfb1c 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -28,16 +28,16 @@ from typing import (
import attr
from prometheus_client import Counter
-from synapse.api.constants import AccountDataTypes, EventTypes, Membership, ReceiptTypes
+from synapse.api.constants import EventTypes, Membership, ReceiptTypes
from synapse.api.filtering import FilterCollection
from synapse.api.presence import UserPresenceState
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
+from synapse.handlers.relations import BundledAggregations
from synapse.logging.context import current_context
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.databases.main.event_push_actions import NotifCounts
-from synapse.storage.databases.main.relations import BundledAggregations
from synapse.storage.roommember import MemberSummary
from synapse.storage.state import StateFilter
from synapse.types import (
@@ -269,6 +269,7 @@ class SyncHandler:
self.store = hs.get_datastores().main
self.notifier = hs.get_notifier()
self.presence_handler = hs.get_presence_handler()
+ self._relations_handler = hs.get_relations_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
self.state = hs.get_state_handler()
@@ -638,8 +639,10 @@ class SyncHandler:
# as clients will have all the necessary information.
bundled_aggregations = None
if limited or newly_joined_room:
- bundled_aggregations = await self.store.get_bundled_aggregations(
- recents, sync_config.user.to_string()
+ bundled_aggregations = (
+ await self._relations_handler.get_bundled_aggregations(
+ recents, sync_config.user.to_string()
+ )
)
return TimelineBatch(
@@ -1601,7 +1604,7 @@ class SyncHandler:
return set(), set(), set(), set()
# 3. Work out which rooms need reporting in the sync response.
- ignored_users = await self._get_ignored_users(user_id)
+ ignored_users = await self.store.ignored_users(user_id)
if since_token:
room_changes = await self._get_rooms_changed(
sync_result_builder, ignored_users
@@ -1627,7 +1630,6 @@ class SyncHandler:
logger.debug("Generating room entry for %s", room_entry.room_id)
await self._generate_room_entry(
sync_result_builder,
- ignored_users,
room_entry,
ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
tags=tags_by_room.get(room_entry.room_id),
@@ -1657,29 +1659,6 @@ class SyncHandler:
newly_left_users,
)
- async def _get_ignored_users(self, user_id: str) -> FrozenSet[str]:
- """Retrieve the users ignored by the given user from their global account_data.
-
- Returns an empty set if
- - there is no global account_data entry for ignored_users
- - there is such an entry, but it's not a JSON object.
- """
- # TODO: Can we `SELECT ignored_user_id FROM ignored_users WHERE ignorer_user_id=?;` instead?
- ignored_account_data = (
- await self.store.get_global_account_data_by_type_for_user(
- user_id=user_id, data_type=AccountDataTypes.IGNORED_USER_LIST
- )
- )
-
- # If there is ignored users account data and it matches the proper type,
- # then use it.
- ignored_users: FrozenSet[str] = frozenset()
- if ignored_account_data:
- ignored_users_data = ignored_account_data.get("ignored_users", {})
- if isinstance(ignored_users_data, dict):
- ignored_users = frozenset(ignored_users_data.keys())
- return ignored_users
-
async def _have_rooms_changed(
self, sync_result_builder: "SyncResultBuilder"
) -> bool:
@@ -2022,7 +2001,6 @@ class SyncHandler:
async def _generate_room_entry(
self,
sync_result_builder: "SyncResultBuilder",
- ignored_users: FrozenSet[str],
room_builder: "RoomSyncResultBuilder",
ephemeral: List[JsonDict],
tags: Optional[Dict[str, Dict[str, Any]]],
@@ -2051,7 +2029,6 @@ class SyncHandler:
Args:
sync_result_builder
- ignored_users: Set of users ignored by user.
room_builder
ephemeral: List of new ephemeral events for room
tags: List of *all* tags for room, or None if there has been
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 843c68eb0f..6854428b7c 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -160,8 +160,9 @@ class FollowerTypingHandler:
"""Should be called whenever we receive updates for typing stream."""
if self._latest_room_serial > token:
- # The master has gone backwards. To prevent inconsistent data, just
- # clear everything.
+ # The typing worker has gone backwards (e.g. it may have restarted).
+ # To prevent inconsistent data, just clear everything.
+ logger.info("Typing handler stream went backwards; resetting")
self._reset()
# Set the latest serial token to whatever the server gave us.
@@ -486,9 +487,7 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]):
if handler._room_serials[room_id] <= from_key:
continue
- if not await service.matches_user_in_member_list(
- room_id, self._main_store
- ):
+ if not await service.is_interested_in_room(room_id, self._main_store):
continue
events.append(self._make_event_for(room_id))
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index d27ed2be6a..048fd4bb82 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -19,8 +19,8 @@ import synapse.metrics
from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, Membership
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.databases.main.user_directory import SearchResult
from synapse.storage.roommember import ProfileInfo
-from synapse.types import JsonDict
from synapse.util.metrics import Measure
if TYPE_CHECKING:
@@ -78,7 +78,7 @@ class UserDirectoryHandler(StateDeltasHandler):
async def search_users(
self, user_id: str, search_term: str, limit: int
- ) -> JsonDict:
+ ) -> SearchResult:
"""Searches for users in directory
Returns:
|