summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/account_data.py10
-rw-r--r--synapse/handlers/appservice.py39
-rw-r--r--synapse/handlers/device.py7
-rw-r--r--synapse/handlers/devicemessage.py6
-rw-r--r--synapse/handlers/e2e_keys.py29
-rw-r--r--synapse/handlers/federation.py6
-rw-r--r--synapse/handlers/federation_event.py10
-rw-r--r--synapse/handlers/initial_sync.py19
-rw-r--r--synapse/handlers/message.py42
-rw-r--r--synapse/handlers/pagination.py8
-rw-r--r--synapse/handlers/presence.py6
-rw-r--r--synapse/handlers/receipts.py106
-rw-r--r--synapse/handlers/relations.py20
-rw-r--r--synapse/handlers/room.py150
-rw-r--r--synapse/handlers/search.py10
-rw-r--r--synapse/handlers/sync.py23
-rw-r--r--synapse/handlers/typing.py4
17 files changed, 296 insertions, 199 deletions
diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py

index 4af9fbc5d1..0478448b47 100644 --- a/synapse/handlers/account_data.py +++ b/synapse/handlers/account_data.py
@@ -23,7 +23,7 @@ from synapse.replication.http.account_data import ( ReplicationUserAccountDataRestServlet, ) from synapse.streams import EventSource -from synapse.types import JsonDict, UserID +from synapse.types import JsonDict, StreamKeyType, UserID if TYPE_CHECKING: from synapse.server import HomeServer @@ -105,7 +105,7 @@ class AccountDataHandler: ) self._notifier.on_new_event( - "account_data_key", max_stream_id, users=[user_id] + StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id] ) await self._notify_modules(user_id, room_id, account_data_type, content) @@ -141,7 +141,7 @@ class AccountDataHandler: ) self._notifier.on_new_event( - "account_data_key", max_stream_id, users=[user_id] + StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id] ) await self._notify_modules(user_id, None, account_data_type, content) @@ -176,7 +176,7 @@ class AccountDataHandler: ) self._notifier.on_new_event( - "account_data_key", max_stream_id, users=[user_id] + StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id] ) return max_stream_id else: @@ -201,7 +201,7 @@ class AccountDataHandler: ) self._notifier.on_new_event( - "account_data_key", max_stream_id, users=[user_id] + StreamKeyType.ACCOUNT_DATA, max_stream_id, users=[user_id] ) return max_stream_id else: diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 85bd5e4768..1da7bcc85b 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py
@@ -38,6 +38,7 @@ from synapse.types import ( JsonDict, RoomAlias, RoomStreamToken, + StreamKeyType, UserID, ) from synapse.util.async_helpers import Linearizer @@ -213,8 +214,8 @@ class ApplicationServicesHandler: Args: stream_key: The stream the event came from. - `stream_key` can be "typing_key", "receipt_key", "presence_key", - "to_device_key" or "device_list_key". Any other value for `stream_key` + `stream_key` can be StreamKeyType.TYPING, StreamKeyType.RECEIPT, StreamKeyType.PRESENCE, + StreamKeyType.TO_DEVICE or StreamKeyType.DEVICE_LIST. Any other value for `stream_key` will cause this function to return early. Ephemeral events will only be pushed to appservices that have opted into @@ -235,11 +236,11 @@ class ApplicationServicesHandler: # Only the following streams are currently supported. # FIXME: We should use constants for these values. if stream_key not in ( - "typing_key", - "receipt_key", - "presence_key", - "to_device_key", - "device_list_key", + StreamKeyType.TYPING, + StreamKeyType.RECEIPT, + StreamKeyType.PRESENCE, + StreamKeyType.TO_DEVICE, + StreamKeyType.DEVICE_LIST, ): return @@ -258,14 +259,14 @@ class ApplicationServicesHandler: # Ignore to-device messages if the feature flag is not enabled if ( - stream_key == "to_device_key" + stream_key == StreamKeyType.TO_DEVICE and not self._msc2409_to_device_messages_enabled ): return # Ignore device lists if the feature flag is not enabled if ( - stream_key == "device_list_key" + stream_key == StreamKeyType.DEVICE_LIST and not self._msc3202_transaction_extensions_enabled ): return @@ -283,15 +284,15 @@ class ApplicationServicesHandler: if ( stream_key in ( - "typing_key", - "receipt_key", - "presence_key", - "to_device_key", + StreamKeyType.TYPING, + StreamKeyType.RECEIPT, + StreamKeyType.PRESENCE, + StreamKeyType.TO_DEVICE, ) and service.supports_ephemeral ) or ( - stream_key == "device_list_key" + stream_key == StreamKeyType.DEVICE_LIST and service.msc3202_transaction_extensions ) ] @@ -317,7 +318,7 @@ class ApplicationServicesHandler: logger.debug("Checking interested services for %s", stream_key) with Measure(self.clock, "notify_interested_services_ephemeral"): for service in services: - if stream_key == "typing_key": + if stream_key == StreamKeyType.TYPING: # Note that we don't persist the token (via set_appservice_stream_type_pos) # for typing_key due to performance reasons and due to their highly # ephemeral nature. @@ -333,7 +334,7 @@ class ApplicationServicesHandler: async with self._ephemeral_events_linearizer.queue( (service.id, stream_key) ): - if stream_key == "receipt_key": + if stream_key == StreamKeyType.RECEIPT: events = await self._handle_receipts(service, new_token) self.scheduler.enqueue_for_appservice(service, ephemeral=events) @@ -342,7 +343,7 @@ class ApplicationServicesHandler: service, "read_receipt", new_token ) - elif stream_key == "presence_key": + elif stream_key == StreamKeyType.PRESENCE: events = await self._handle_presence(service, users, new_token) self.scheduler.enqueue_for_appservice(service, ephemeral=events) @@ -351,7 +352,7 @@ class ApplicationServicesHandler: service, "presence", new_token ) - elif stream_key == "to_device_key": + elif stream_key == StreamKeyType.TO_DEVICE: # Retrieve a list of to-device message events, as well as the # maximum stream token of the messages we were able to retrieve. to_device_messages = await self._get_to_device_messages( @@ -366,7 +367,7 @@ class ApplicationServicesHandler: service, "to_device", new_token ) - elif stream_key == "device_list_key": + elif stream_key == StreamKeyType.DEVICE_LIST: device_list_summary = await self._get_device_list_summary( service, new_token ) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index a91b1ee4d5..1d6d1f8a92 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py
@@ -43,6 +43,7 @@ from synapse.metrics.background_process_metrics import ( ) from synapse.types import ( JsonDict, + StreamKeyType, StreamToken, UserID, get_domain_from_id, @@ -502,7 +503,7 @@ class DeviceHandler(DeviceWorkerHandler): # specify the user ID too since the user should always get their own device list # updates, even if they aren't in any rooms. self.notifier.on_new_event( - "device_list_key", position, users={user_id}, rooms=room_ids + StreamKeyType.DEVICE_LIST, position, users={user_id}, rooms=room_ids ) # We may need to do some processing asynchronously for local user IDs. @@ -523,7 +524,9 @@ class DeviceHandler(DeviceWorkerHandler): from_user_id, user_ids ) - self.notifier.on_new_event("device_list_key", position, users=[from_user_id]) + self.notifier.on_new_event( + StreamKeyType.DEVICE_LIST, position, users=[from_user_id] + ) async def user_left_room(self, user: UserID, room_id: str) -> None: user_id = user.to_string() diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 4cb725d027..53668cce3b 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py
@@ -26,7 +26,7 @@ from synapse.logging.opentracing import ( set_tag, ) from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet -from synapse.types import JsonDict, Requester, UserID, get_domain_from_id +from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id from synapse.util import json_encoder from synapse.util.stringutils import random_string @@ -151,7 +151,7 @@ class DeviceMessageHandler: # Notify listeners that there are new to-device messages to process, # handing them the latest stream id. self.notifier.on_new_event( - "to_device_key", last_stream_id, users=local_messages.keys() + StreamKeyType.TO_DEVICE, last_stream_id, users=local_messages.keys() ) async def _check_for_unknown_devices( @@ -285,7 +285,7 @@ class DeviceMessageHandler: # Notify listeners that there are new to-device messages to process, # handing them the latest stream id. self.notifier.on_new_event( - "to_device_key", last_stream_id, users=local_messages.keys() + StreamKeyType.TO_DEVICE, last_stream_id, users=local_messages.keys() ) if self.federation_sender: diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index d6714228ef..e6c2cfb8c8 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py
@@ -15,7 +15,7 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple import attr from canonicaljson import encode_canonical_json @@ -1105,22 +1105,19 @@ class E2eKeysHandler: # can request over federation raise NotFoundError("No %s key found for %s" % (key_type, user_id)) - ( - key, - key_id, - verify_key, - ) = await self._retrieve_cross_signing_keys_for_remote_user(user, key_type) - - if key is None: + cross_signing_keys = await self._retrieve_cross_signing_keys_for_remote_user( + user, key_type + ) + if cross_signing_keys is None: raise NotFoundError("No %s key found for %s" % (key_type, user_id)) - return key, key_id, verify_key + return cross_signing_keys async def _retrieve_cross_signing_keys_for_remote_user( self, user: UserID, desired_key_type: str, - ) -> Tuple[Optional[dict], Optional[str], Optional[VerifyKey]]: + ) -> Optional[Tuple[Dict[str, Any], str, VerifyKey]]: """Queries cross-signing keys for a remote user and saves them to the database Only the key specified by `key_type` will be returned, while all retrieved keys @@ -1146,12 +1143,10 @@ class E2eKeysHandler: type(e), e, ) - return None, None, None + return None # Process each of the retrieved cross-signing keys - desired_key = None - desired_key_id = None - desired_verify_key = None + desired_key_data = None retrieved_device_ids = [] for key_type in ["master", "self_signing"]: key_content = remote_result.get(key_type + "_key") @@ -1196,9 +1191,7 @@ class E2eKeysHandler: # If this is the desired key type, save it and its ID/VerifyKey if key_type == desired_key_type: - desired_key = key_content - desired_verify_key = verify_key - desired_key_id = key_id + desired_key_data = key_content, key_id, verify_key # At the same time, store this key in the db for subsequent queries await self.store.set_e2e_cross_signing_key( @@ -1212,7 +1205,7 @@ class E2eKeysHandler: user.to_string(), retrieved_device_ids ) - return desired_key, desired_key_id, desired_verify_key + return desired_key_data def _check_cross_signing_key( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 38dc5b1f6e..be5099b507 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py
@@ -659,7 +659,7 @@ class FederationHandler: # in the invitee's sync stream. It is stripped out for all other local users. event.unsigned["knock_room_state"] = stripped_room_state["knock_state_events"] - context = EventContext.for_outlier() + context = EventContext.for_outlier(self.storage) stream_id = await self._federation_event_handler.persist_events_and_notify( event.room_id, [(event, context)] ) @@ -848,7 +848,7 @@ class FederationHandler: ) ) - context = EventContext.for_outlier() + context = EventContext.for_outlier(self.storage) await self._federation_event_handler.persist_events_and_notify( event.room_id, [(event, context)] ) @@ -877,7 +877,7 @@ class FederationHandler: await self.federation_client.send_leave(host_list, event) - context = EventContext.for_outlier() + context = EventContext.for_outlier(self.storage) stream_id = await self._federation_event_handler.persist_events_and_notify( event.room_id, [(event, context)] ) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 6cf927e4ff..761caa04b7 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py
@@ -103,7 +103,7 @@ class FederationEventHandler: self._event_creation_handler = hs.get_event_creation_handler() self._event_auth_handler = hs.get_event_auth_handler() self._message_handler = hs.get_message_handler() - self._action_generator = hs.get_action_generator() + self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator() self._state_resolution_handler = hs.get_state_resolution_handler() # avoid a circular dependency by deferring execution here self._get_room_member_handler = hs.get_room_member_handler @@ -1423,7 +1423,7 @@ class FederationEventHandler: # we're not bothering about room state, so flag the event as an outlier. event.internal_metadata.outlier = True - context = EventContext.for_outlier() + context = EventContext.for_outlier(self._storage) try: validate_event_for_room_version(room_version_obj, event) check_auth_rules_for_event(room_version_obj, event, auth) @@ -1874,10 +1874,10 @@ class FederationEventHandler: ) return EventContext.with_state( + storage=self._storage, state_group=state_group, state_group_before_event=context.state_group_before_event, - current_state_ids=current_state_ids, - prev_state_ids=prev_state_ids, + state_delta_due_to_event=state_updates, prev_group=prev_group, delta_ids=state_updates, partial_state=context.partial_state, @@ -1913,7 +1913,7 @@ class FederationEventHandler: min_depth, ) else: - await self._action_generator.handle_push_actions_for_event( + await self._bulk_push_rule_evaluator.action_for_event_by_user( event, context ) diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 7b94770f97..d79248ad90 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py
@@ -30,6 +30,7 @@ from synapse.types import ( Requester, RoomStreamToken, StateMap, + StreamKeyType, StreamToken, UserID, ) @@ -143,7 +144,7 @@ class InitialSyncHandler: to_key=int(now_token.receipt_key), ) if self.hs.config.experimental.msc2285_enabled: - receipt = ReceiptEventSource.filter_out_private(receipt, user_id) + receipt = ReceiptEventSource.filter_out_private_receipts(receipt, user_id) tags_by_room = await self.store.get_tags_for_user(user_id) @@ -220,8 +221,10 @@ class InitialSyncHandler: self.storage, user_id, messages ) - start_token = now_token.copy_and_replace("room_key", token) - end_token = now_token.copy_and_replace("room_key", room_end_token) + start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token) + end_token = now_token.copy_and_replace( + StreamKeyType.ROOM, room_end_token + ) time_now = self.clock.time_msec() d["messages"] = { @@ -369,8 +372,8 @@ class InitialSyncHandler: self.storage, user_id, messages, is_peeking=is_peeking ) - start_token = StreamToken.START.copy_and_replace("room_key", token) - end_token = StreamToken.START.copy_and_replace("room_key", stream_token) + start_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, token) + end_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, stream_token) time_now = self.clock.time_msec() @@ -449,7 +452,9 @@ class InitialSyncHandler: if not receipts: return [] if self.hs.config.experimental.msc2285_enabled: - receipts = ReceiptEventSource.filter_out_private(receipts, user_id) + receipts = ReceiptEventSource.filter_out_private_receipts( + receipts, user_id + ) return receipts presence, receipts, (messages, token) = await make_deferred_yieldable( @@ -472,7 +477,7 @@ class InitialSyncHandler: self.storage, user_id, messages, is_peeking=is_peeking ) - start_token = now_token.copy_and_replace("room_key", token) + start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token) end_token = now_token time_now = self.clock.time_msec() diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index c28b792e6f..0951b9c71f 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py
@@ -44,7 +44,7 @@ from synapse.api.errors import ( from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions from synapse.api.urls import ConsentURIBuilder from synapse.event_auth import validate_event_for_room_version -from synapse.events import EventBase +from synapse.events import EventBase, relation_from_event from synapse.events.builder import EventBuilder from synapse.events.snapshot import EventContext from synapse.events.validator import EventValidator @@ -426,7 +426,7 @@ class EventCreationHandler: # This is to stop us from diverging history *too* much. self.limiter = Linearizer(max_count=5, name="room_event_creation_limit") - self.action_generator = hs.get_action_generator() + self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator() self.spam_checker = hs.get_spam_checker() self.third_party_event_rules: "ThirdPartyEventRules" = ( @@ -757,6 +757,10 @@ class EventCreationHandler: The previous version of the event is returned, if it is found in the event context. Otherwise, None is returned. """ + if event.internal_metadata.is_outlier(): + # This can happen due to out of band memberships + return None + prev_state_ids = await context.get_prev_state_ids() prev_event_id = prev_state_ids.get((event.type, event.state_key)) if not prev_event_id: @@ -1001,7 +1005,7 @@ class EventCreationHandler: # after it is created if builder.internal_metadata.outlier: event.internal_metadata.outlier = True - context = EventContext.for_outlier() + context = EventContext.for_outlier(self.storage) elif ( event.type == EventTypes.MSC2716_INSERTION and state_event_ids @@ -1056,20 +1060,11 @@ class EventCreationHandler: SynapseError if the event is invalid. """ - relation = event.content.get("m.relates_to") + relation = relation_from_event(event) if not relation: return - relation_type = relation.get("rel_type") - if not relation_type: - return - - # Ensure the parent is real. - relates_to = relation.get("event_id") - if not relates_to: - return - - parent_event = await self.store.get_event(relates_to, allow_none=True) + parent_event = await self.store.get_event(relation.parent_id, allow_none=True) if parent_event: # And in the same room. if parent_event.room_id != event.room_id: @@ -1078,28 +1073,31 @@ class EventCreationHandler: else: # There must be some reason that the client knows the event exists, # see if there are existing relations. If so, assume everything is fine. - if not await self.store.event_is_target_of_relation(relates_to): + if not await self.store.event_is_target_of_relation(relation.parent_id): # Otherwise, the client can't know about the parent event! raise SynapseError(400, "Can't send relation to unknown event") # If this event is an annotation then we check that that the sender # can't annotate the same way twice (e.g. stops users from liking an # event multiple times). - if relation_type == RelationTypes.ANNOTATION: - aggregation_key = relation["key"] + if relation.rel_type == RelationTypes.ANNOTATION: + aggregation_key = relation.aggregation_key + + if aggregation_key is None: + raise SynapseError(400, "Missing aggregation key") if len(aggregation_key) > 500: raise SynapseError(400, "Aggregation key is too long") already_exists = await self.store.has_user_annotated_event( - relates_to, event.type, aggregation_key, event.sender + relation.parent_id, event.type, aggregation_key, event.sender ) if already_exists: raise SynapseError(400, "Can't send same reaction twice") # Don't attempt to start a thread if the parent event is a relation. - elif relation_type == RelationTypes.THREAD: - if await self.store.event_includes_relation(relates_to): + elif relation.rel_type == RelationTypes.THREAD: + if await self.store.event_includes_relation(relation.parent_id): raise SynapseError( 400, "Cannot start threads from an event with a relation" ) @@ -1245,7 +1243,9 @@ class EventCreationHandler: # and `state_groups` because they have `prev_events` that aren't persisted yet # (historical messages persisted in reverse-chronological order). if not event.internal_metadata.is_historical(): - await self.action_generator.handle_push_actions_for_event(event, context) + await self._bulk_push_rule_evaluator.action_for_event_by_user( + event, context + ) try: # If we're a worker we need to hit out to the master. diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 7ee3340373..6ae88add95 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py
@@ -27,7 +27,7 @@ from synapse.handlers.room import ShutdownRoomResponse from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.state import StateFilter from synapse.streams.config import PaginationConfig -from synapse.types import JsonDict, Requester +from synapse.types import JsonDict, Requester, StreamKeyType from synapse.util.async_helpers import ReadWriteLock from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client @@ -448,7 +448,7 @@ class PaginationHandler: ) # We expect `/messages` to use historic pagination tokens by default but # `/messages` should still works with live tokens when manually provided. - assert from_token.room_key.topological + assert from_token.room_key.topological is not None if pagin_config.limit is None: # This shouldn't happen as we've set a default limit before this @@ -491,7 +491,7 @@ class PaginationHandler: if leave_token.topological < curr_topo: from_token = from_token.copy_and_replace( - "room_key", leave_token + StreamKeyType.ROOM, leave_token ) await self.hs.get_federation_handler().maybe_backfill( @@ -513,7 +513,7 @@ class PaginationHandler: event_filter=event_filter, ) - next_token = from_token.copy_and_replace("room_key", next_key) + next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key) if events: if event_filter: diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 268481ec19..dd84e6c88b 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py
@@ -66,7 +66,7 @@ from synapse.replication.tcp.commands import ClearUserSyncsCommand from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream from synapse.storage.databases.main import DataStore from synapse.streams import EventSource -from synapse.types import JsonDict, UserID, get_domain_from_id +from synapse.types import JsonDict, StreamKeyType, UserID, get_domain_from_id from synapse.util.async_helpers import Linearizer from synapse.util.caches.descriptors import _CacheContext, cached from synapse.util.metrics import Measure @@ -522,7 +522,7 @@ class WorkerPresenceHandler(BasePresenceHandler): room_ids_to_states, users_to_states = parties self.notifier.on_new_event( - "presence_key", + StreamKeyType.PRESENCE, stream_id, rooms=room_ids_to_states.keys(), users=users_to_states.keys(), @@ -1145,7 +1145,7 @@ class PresenceHandler(BasePresenceHandler): room_ids_to_states, users_to_states = parties self.notifier.on_new_event( - "presence_key", + StreamKeyType.PRESENCE, stream_id, rooms=room_ids_to_states.keys(), users=[UserID.from_string(u) for u in users_to_states], diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 43d615357b..e6a35f1d09 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py
@@ -17,7 +17,13 @@ from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple from synapse.api.constants import ReceiptTypes from synapse.appservice import ApplicationService from synapse.streams import EventSource -from synapse.types import JsonDict, ReadReceipt, UserID, get_domain_from_id +from synapse.types import ( + JsonDict, + ReadReceipt, + StreamKeyType, + UserID, + get_domain_from_id, +) if TYPE_CHECKING: from synapse.server import HomeServer @@ -129,7 +135,9 @@ class ReceiptsHandler: affected_room_ids = list({r.room_id for r in receipts}) - self.notifier.on_new_event("receipt_key", max_batch_id, rooms=affected_room_ids) + self.notifier.on_new_event( + StreamKeyType.RECEIPT, max_batch_id, rooms=affected_room_ids + ) # Note that the min here shouldn't be relied upon to be accurate. await self.hs.get_pusherpool().on_new_receipts( min_batch_id, max_batch_id, affected_room_ids @@ -165,43 +173,69 @@ class ReceiptEventSource(EventSource[int, JsonDict]): self.config = hs.config @staticmethod - def filter_out_private(events: List[JsonDict], user_id: str) -> List[JsonDict]: - """ - This method takes in what is returned by - get_linearized_receipts_for_rooms() and goes through read receipts - filtering out m.read.private receipts if they were not sent by the - current user. + def filter_out_private_receipts( + rooms: List[JsonDict], user_id: str + ) -> List[JsonDict]: """ + Filters a list of serialized receipts (as returned by /sync and /initialSync) + and removes private read receipts of other users. - visible_events = [] - - # filter out private receipts the user shouldn't see - for event in events: - content = event.get("content", {}) - new_event = event.copy() - new_event["content"] = {} - - for event_id, event_content in content.items(): - receipt_event = {} - for receipt_type, receipt_content in event_content.items(): - if receipt_type == ReceiptTypes.READ_PRIVATE: - user_rr = receipt_content.get(user_id, None) - if user_rr: - receipt_event[ReceiptTypes.READ_PRIVATE] = { - user_id: user_rr.copy() - } - else: - receipt_event[receipt_type] = receipt_content.copy() + This operates on the return value of get_linearized_receipts_for_rooms(), + which is wrapped in a cache. Care must be taken to ensure that the input + values are not modified. - # Only include the receipt event if it is non-empty. - if receipt_event: - new_event["content"][event_id] = receipt_event + Args: + rooms: A list of mappings, each mapping has a `content` field, which + is a map of event ID -> receipt type -> user ID -> receipt information. - # Append new_event to visible_events unless empty - if len(new_event["content"].keys()) > 0: - visible_events.append(new_event) + Returns: + The same as rooms, but filtered. + """ - return visible_events + result = [] + + # Iterate through each room's receipt content. + for room in rooms: + # The receipt content with other user's private read receipts removed. + content = {} + + # Iterate over each event ID / receipts for that event. + for event_id, orig_event_content in room.get("content", {}).items(): + event_content = orig_event_content + # If there are private read receipts, additional logic is necessary. + if ReceiptTypes.READ_PRIVATE in event_content: + # Make a copy without private read receipts to avoid leaking + # other user's private read receipts.. + event_content = { + receipt_type: receipt_value + for receipt_type, receipt_value in event_content.items() + if receipt_type != ReceiptTypes.READ_PRIVATE + } + + # Copy the current user's private read receipt from the + # original content, if it exists. + user_private_read_receipt = orig_event_content[ + ReceiptTypes.READ_PRIVATE + ].get(user_id, None) + if user_private_read_receipt: + event_content[ReceiptTypes.READ_PRIVATE] = { + user_id: user_private_read_receipt + } + + # Include the event if there is at least one non-private read + # receipt or the current user has a private read receipt. + if event_content: + content[event_id] = event_content + + # Include the event if there is at least one non-private read receipt + # or the current user has a private read receipt. + if content: + # Build a new event to avoid mutating the cache. + new_room = {k: v for k, v in room.items() if k != "content"} + new_room["content"] = content + result.append(new_room) + + return result async def get_new_events( self, @@ -223,7 +257,9 @@ class ReceiptEventSource(EventSource[int, JsonDict]): ) if self.config.experimental.msc2285_enabled: - events = ReceiptEventSource.filter_out_private(events, user.to_string()) + events = ReceiptEventSource.filter_out_private_receipts( + events, user.to_string() + ) return events, to_key diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py
index c2754ec918..ab7e54857d 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py
@@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import collections.abc import logging from typing import ( TYPE_CHECKING, @@ -28,7 +27,7 @@ import attr from synapse.api.constants import RelationTypes from synapse.api.errors import SynapseError -from synapse.events import EventBase +from synapse.events import EventBase, relation_from_event from synapse.storage.databases.main.relations import _RelatedEvent from synapse.types import JsonDict, Requester, StreamToken, UserID from synapse.visibility import filter_events_for_client @@ -373,20 +372,21 @@ class RelationsHandler: if event.is_state(): continue - relates_to = event.content.get("m.relates_to") - relation_type = None - if isinstance(relates_to, collections.abc.Mapping): - relation_type = relates_to.get("rel_type") + relates_to = relation_from_event(event) + if relates_to: # An event which is a replacement (ie edit) or annotation (ie, # reaction) may not have any other event related to it. - if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE): + if relates_to.rel_type in ( + RelationTypes.ANNOTATION, + RelationTypes.REPLACE, + ): continue + # Track the event's relation information for later. + relations_by_id[event.event_id] = relates_to.rel_type + # The event should get bundled aggregations. events_by_id[event.event_id] = event - # Track the event's relation information for later. - if isinstance(relation_type, str): - relations_by_id[event.event_id] = relation_type # event ID -> bundled aggregation in non-serialized form. results: Dict[str, BundledAggregations] = {} diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 604eb6ec15..a2973109ad 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py
@@ -33,6 +33,7 @@ from typing import ( import attr from typing_extensions import TypedDict +import synapse.events.snapshot from synapse.api.constants import ( EventContentFields, EventTypes, @@ -72,12 +73,12 @@ from synapse.types import ( RoomID, RoomStreamToken, StateMap, + StreamKeyType, StreamToken, UserID, create_requester, ) from synapse.util import stringutils -from synapse.util.async_helpers import Linearizer from synapse.util.caches.response_cache import ResponseCache from synapse.util.stringutils import parse_and_validate_server_name from synapse.visibility import filter_events_for_client @@ -149,10 +150,11 @@ class RoomCreationHandler: ) preset_config["encrypted"] = encrypted - self._replication = hs.get_replication_data_handler() + self._default_power_level_content_override = ( + self.config.room.default_power_level_content_override + ) - # linearizer to stop two upgrades happening at once - self._upgrade_linearizer = Linearizer("room_upgrade_linearizer") + self._replication = hs.get_replication_data_handler() # If a user tries to update the same room multiple times in quick # succession, only process the first attempt and return its result to @@ -196,6 +198,39 @@ class RoomCreationHandler: 400, "An upgrade for this room is currently in progress" ) + # Check whether the room exists and 404 if it doesn't. + # We could go straight for the auth check, but that will raise a 403 instead. + old_room = await self.store.get_room(old_room_id) + if old_room is None: + raise NotFoundError("Unknown room id %s" % (old_room_id,)) + + new_room_id = self._generate_room_id() + + # Check whether the user has the power level to carry out the upgrade. + # `check_auth_rules_from_context` will check that they are in the room and have + # the required power level to send the tombstone event. + ( + tombstone_event, + tombstone_context, + ) = await self.event_creation_handler.create_event( + requester, + { + "type": EventTypes.Tombstone, + "state_key": "", + "room_id": old_room_id, + "sender": user_id, + "content": { + "body": "This room has been replaced", + "replacement_room": new_room_id, + }, + }, + ) + old_room_version = await self.store.get_room_version(old_room_id) + validate_event_for_room_version(old_room_version, tombstone_event) + await self._event_auth_handler.check_auth_rules_from_context( + old_room_version, tombstone_event, tombstone_context + ) + # Upgrade the room # # If this user has sent multiple upgrade requests for the same room @@ -206,19 +241,35 @@ class RoomCreationHandler: self._upgrade_room, requester, old_room_id, - new_version, # args for _upgrade_room + old_room, # args for _upgrade_room + new_room_id, + new_version, + tombstone_event, + tombstone_context, ) return ret async def _upgrade_room( - self, requester: Requester, old_room_id: str, new_version: RoomVersion + self, + requester: Requester, + old_room_id: str, + old_room: Dict[str, Any], + new_room_id: str, + new_version: RoomVersion, + tombstone_event: EventBase, + tombstone_context: synapse.events.snapshot.EventContext, ) -> str: """ Args: requester: the user requesting the upgrade old_room_id: the id of the room to be replaced - new_versions: the version to upgrade the room to + old_room: a dict containing room information for the room to be replaced, + as returned by `RoomWorkerStore.get_room`. + new_room_id: the id of the replacement room + new_version: the version to upgrade the room to + tombstone_event: the tombstone event to send to the old room + tombstone_context: the context for the tombstone event Raises: ShadowBanError if the requester is shadow-banned. @@ -226,40 +277,15 @@ class RoomCreationHandler: user_id = requester.user.to_string() assert self.hs.is_mine_id(user_id), "User must be our own: %s" % (user_id,) - # start by allocating a new room id - r = await self.store.get_room(old_room_id) - if r is None: - raise NotFoundError("Unknown room id %s" % (old_room_id,)) - new_room_id = await self._generate_room_id( - creator_id=user_id, - is_public=r["is_public"], - room_version=new_version, - ) - logger.info("Creating new room %s to replace %s", new_room_id, old_room_id) - # we create and auth the tombstone event before properly creating the new - # room, to check our user has perms in the old room. - ( - tombstone_event, - tombstone_context, - ) = await self.event_creation_handler.create_event( - requester, - { - "type": EventTypes.Tombstone, - "state_key": "", - "room_id": old_room_id, - "sender": user_id, - "content": { - "body": "This room has been replaced", - "replacement_room": new_room_id, - }, - }, - ) - old_room_version = await self.store.get_room_version(old_room_id) - validate_event_for_room_version(old_room_version, tombstone_event) - await self._event_auth_handler.check_auth_rules_from_context( - old_room_version, tombstone_event, tombstone_context + # create the new room. may raise a `StoreError` in the exceedingly unlikely + # event of a room ID collision. + await self.store.store_room( + room_id=new_room_id, + room_creator_user_id=user_id, + is_public=old_room["is_public"], + room_version=new_version, ) await self.clone_existing_room( @@ -778,7 +804,7 @@ class RoomCreationHandler: visibility = config.get("visibility", "private") is_public = visibility == "public" - room_id = await self._generate_room_id( + room_id = await self._generate_and_create_room_id( creator_id=user_id, is_public=is_public, room_version=room_version, @@ -1042,9 +1068,19 @@ class RoomCreationHandler: for invitee in invite_list: power_level_content["users"][invitee] = 100 - # Power levels overrides are defined per chat preset + # If the user supplied a preset name e.g. "private_chat", + # we apply that preset power_level_content.update(config["power_level_content_override"]) + # If the server config contains default_power_level_content_override, + # and that contains information for this room preset, apply it. + if self._default_power_level_content_override: + override = self._default_power_level_content_override.get(preset_config) + if override is not None: + power_level_content.update(override) + + # Finally, if the user supplied specific permissions for this room, + # apply those. if power_level_content_override: power_level_content.update(power_level_content_override) @@ -1090,7 +1126,26 @@ class RoomCreationHandler: return last_sent_stream_id - async def _generate_room_id( + def _generate_room_id(self) -> str: + """Generates a random room ID. + + Room IDs look like "!opaque_id:domain" and are case-sensitive as per the spec + at https://spec.matrix.org/v1.2/appendices/#room-ids-and-event-ids. + + Does not check for collisions with existing rooms or prevent future calls from + returning the same room ID. To ensure the uniqueness of a new room ID, use + `_generate_and_create_room_id` instead. + + Synapse's room IDs are 18 [a-zA-Z] characters long, which comes out to around + 102 bits. + + Returns: + A random room ID of the form "!opaque_id:domain". + """ + random_string = stringutils.random_string(18) + return RoomID(random_string, self.hs.hostname).to_string() + + async def _generate_and_create_room_id( self, creator_id: str, is_public: bool, @@ -1101,8 +1156,7 @@ class RoomCreationHandler: attempts = 0 while attempts < 5: try: - random_string = stringutils.random_string(18) - gen_room_id = RoomID(random_string, self.hs.hostname).to_string() + gen_room_id = self._generate_room_id() await self.store.store_room( room_id=gen_room_id, room_creator_user_id=creator_id, @@ -1239,10 +1293,10 @@ class RoomContextHandler: events_after=events_after, state=await filter_evts(state_events), aggregations=aggregations, - start=await token.copy_and_replace("room_key", results.start).to_string( - self.store - ), - end=await token.copy_and_replace("room_key", results.end).to_string( + start=await token.copy_and_replace( + StreamKeyType.ROOM, results.start + ).to_string(self.store), + end=await token.copy_and_replace(StreamKeyType.ROOM, results.end).to_string( self.store ), ) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 5619f8f50e..cd1c47dae8 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py
@@ -24,7 +24,7 @@ from synapse.api.errors import NotFoundError, SynapseError from synapse.api.filtering import Filter from synapse.events import EventBase from synapse.storage.state import StateFilter -from synapse.types import JsonDict, UserID +from synapse.types import JsonDict, StreamKeyType, UserID from synapse.visibility import filter_events_for_client if TYPE_CHECKING: @@ -655,11 +655,11 @@ class SearchHandler: "events_before": events_before, "events_after": events_after, "start": await now_token.copy_and_replace( - "room_key", res.start + StreamKeyType.ROOM, res.start + ).to_string(self.store), + "end": await now_token.copy_and_replace( + StreamKeyType.ROOM, res.end ).to_string(self.store), - "end": await now_token.copy_and_replace("room_key", res.end).to_string( - self.store - ), } if include_profile: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 2c555a66d0..4be08fe7cb 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -37,6 +37,7 @@ from synapse.types import ( Requester, RoomStreamToken, StateMap, + StreamKeyType, StreamToken, UserID, ) @@ -449,7 +450,7 @@ class SyncHandler: room_ids=room_ids, is_guest=sync_config.is_guest, ) - now_token = now_token.copy_and_replace("typing_key", typing_key) + now_token = now_token.copy_and_replace(StreamKeyType.TYPING, typing_key) ephemeral_by_room: JsonDict = {} @@ -471,7 +472,7 @@ class SyncHandler: room_ids=room_ids, is_guest=sync_config.is_guest, ) - now_token = now_token.copy_and_replace("receipt_key", receipt_key) + now_token = now_token.copy_and_replace(StreamKeyType.RECEIPT, receipt_key) for event in receipts: room_id = event["room_id"] @@ -537,7 +538,9 @@ class SyncHandler: prev_batch_token = now_token if recents: room_key = recents[0].internal_metadata.before - prev_batch_token = now_token.copy_and_replace("room_key", room_key) + prev_batch_token = now_token.copy_and_replace( + StreamKeyType.ROOM, room_key + ) return TimelineBatch( events=recents, prev_batch=prev_batch_token, limited=False @@ -611,7 +614,7 @@ class SyncHandler: recents = recents[-timeline_limit:] room_key = recents[0].internal_metadata.before - prev_batch_token = now_token.copy_and_replace("room_key", room_key) + prev_batch_token = now_token.copy_and_replace(StreamKeyType.ROOM, room_key) # Don't bother to bundle aggregations if the timeline is unlimited, # as clients will have all the necessary information. @@ -1398,7 +1401,7 @@ class SyncHandler: now_token.to_device_key, ) sync_result_builder.now_token = now_token.copy_and_replace( - "to_device_key", stream_id + StreamKeyType.TO_DEVICE, stream_id ) sync_result_builder.to_device = messages else: @@ -1503,7 +1506,7 @@ class SyncHandler: ) assert presence_key sync_result_builder.now_token = now_token.copy_and_replace( - "presence_key", presence_key + StreamKeyType.PRESENCE, presence_key ) extra_users_ids = set(newly_joined_or_invited_users) @@ -1826,7 +1829,7 @@ class SyncHandler: # stream token as it'll only be used in the context of this # room. (c.f. the docstring of `to_room_stream_token`). leave_token = since_token.copy_and_replace( - "room_key", leave_position.to_room_stream_token() + StreamKeyType.ROOM, leave_position.to_room_stream_token() ) # If this is an out of band message, like a remote invite @@ -1875,7 +1878,9 @@ class SyncHandler: if room_entry: events, start_key = room_entry - prev_batch_token = now_token.copy_and_replace("room_key", start_key) + prev_batch_token = now_token.copy_and_replace( + StreamKeyType.ROOM, start_key + ) entry = RoomSyncResultBuilder( room_id=room_id, @@ -1972,7 +1977,7 @@ class SyncHandler: continue leave_token = now_token.copy_and_replace( - "room_key", RoomStreamToken(None, event.stream_ordering) + StreamKeyType.ROOM, RoomStreamToken(None, event.stream_ordering) ) room_entries.append( RoomSyncResultBuilder( diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 6854428b7c..bb00750bfd 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py
@@ -25,7 +25,7 @@ from synapse.metrics.background_process_metrics import ( ) from synapse.replication.tcp.streams import TypingStream from synapse.streams import EventSource -from synapse.types import JsonDict, Requester, UserID, get_domain_from_id +from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer @@ -382,7 +382,7 @@ class TypingWriterHandler(FollowerTypingHandler): ) self.notifier.on_new_event( - "typing_key", self._latest_room_serial, rooms=[member.room_id] + StreamKeyType.TYPING, self._latest_room_serial, rooms=[member.room_id] ) async def get_all_typing_updates(