diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/config/experimental.py | 31 | ||||
-rw-r--r-- | synapse/event_auth.py | 23 | ||||
-rw-r--r-- | synapse/events/snapshot.py | 1 | ||||
-rw-r--r-- | synapse/events/utils.py | 87 | ||||
-rw-r--r-- | synapse/handlers/event_auth.py | 13 | ||||
-rw-r--r-- | synapse/handlers/events.py | 20 | ||||
-rw-r--r-- | synapse/handlers/initial_sync.py | 51 | ||||
-rw-r--r-- | synapse/handlers/message.py | 9 | ||||
-rw-r--r-- | synapse/handlers/pagination.py | 4 | ||||
-rw-r--r-- | synapse/handlers/relations.py | 12 | ||||
-rw-r--r-- | synapse/handlers/room.py | 4 | ||||
-rw-r--r-- | synapse/handlers/search.py | 43 | ||||
-rw-r--r-- | synapse/push/bulk_push_rule_evaluator.py | 35 | ||||
-rw-r--r-- | synapse/rest/admin/server_notice_servlet.py | 34 | ||||
-rw-r--r-- | synapse/rest/client/events.py | 16 | ||||
-rw-r--r-- | synapse/rest/client/notifications.py | 12 | ||||
-rw-r--r-- | synapse/rest/client/room.py | 192 | ||||
-rw-r--r-- | synapse/rest/client/sendtodevice.py | 25 | ||||
-rw-r--r-- | synapse/rest/client/sync.py | 10 | ||||
-rw-r--r-- | synapse/rest/client/transactions.py | 55 | ||||
-rw-r--r-- | synapse/server.py | 2 | ||||
-rw-r--r-- | synapse/storage/database.py | 10 | ||||
-rw-r--r-- | synapse/storage/databases/main/end_to_end_keys.py | 24 |
23 files changed, 393 insertions, 320 deletions
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index fc64f2bda1..7e05f78f70 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -166,29 +166,9 @@ class ExperimentalConfig(Config): # MSC3391: Removing account data. self.msc3391_enabled = experimental.get("msc3391_enabled", False) - # MSC3925: do not replace events with their edits - self.msc3925_inhibit_edit = experimental.get("msc3925_inhibit_edit", False) - - # MSC3758: exact_event_match push rule condition - self.msc3758_exact_event_match = experimental.get( - "msc3758_exact_event_match", False - ) - - # MSC3873: Disambiguate event_match keys. - self.msc3873_escape_event_match_key = experimental.get( - "msc3873_escape_event_match_key", False - ) - - # MSC3966: exact_event_property_contains push rule condition. - self.msc3966_exact_event_property_contains = experimental.get( - "msc3966_exact_event_property_contains", False - ) - - # MSC3952: Intentional mentions, this depends on MSC3758 and MSC3966. - self.msc3952_intentional_mentions = ( - experimental.get("msc3952_intentional_mentions", False) - and self.msc3758_exact_event_match - and self.msc3966_exact_event_property_contains + # MSC3952: Intentional mentions, this depends on MSC3966. + self.msc3952_intentional_mentions = experimental.get( + "msc3952_intentional_mentions", False ) # MSC3959: Do not generate notifications for edits. @@ -196,10 +176,5 @@ class ExperimentalConfig(Config): "msc3958_supress_edit_notifs", False ) - # MSC3966: exact_event_property_contains push rule condition. - self.msc3966_exact_event_property_contains = experimental.get( - "msc3966_exact_event_property_contains", False - ) - # MSC3967: Do not require UIA when first uploading cross signing keys self.msc3967_enabled = experimental.get("msc3967_enabled", False) diff --git a/synapse/event_auth.py b/synapse/event_auth.py index 4d6d1b8ebd..af55874b5c 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -168,13 +168,24 @@ async def check_state_independent_auth_rules( return # 2. Reject if event has auth_events that: ... - auth_events = await store.get_events( - event.auth_event_ids(), - redact_behaviour=EventRedactBehaviour.as_is, - allow_rejected=True, - ) if batched_auth_events: - auth_events.update(batched_auth_events) + # Copy the batched auth events to avoid mutating them. + auth_events = dict(batched_auth_events) + needed_auth_event_ids = set(event.auth_event_ids()) - batched_auth_events.keys() + if needed_auth_event_ids: + auth_events.update( + await store.get_events( + needed_auth_event_ids, + redact_behaviour=EventRedactBehaviour.as_is, + allow_rejected=True, + ) + ) + else: + auth_events = await store.get_events( + event.auth_event_ids(), + redact_behaviour=EventRedactBehaviour.as_is, + allow_rejected=True, + ) room_id = event.room_id auth_dict: MutableStateMap[str] = {} diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index a91a5d1e3c..c04ad08cbb 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -293,6 +293,7 @@ class EventContext(UnpersistedEventContextBase): Maps a (type, state_key) to the event ID of the state event matching this tuple. """ + assert self.state_group_before_event is not None return await self._storage.state.get_state_ids_for_group( self.state_group_before_event, state_filter diff --git a/synapse/events/utils.py b/synapse/events/utils.py index eaa6cad4af..b9c15ffcdb 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -38,8 +38,7 @@ from synapse.api.constants import ( ) from synapse.api.errors import Codes, SynapseError from synapse.api.room_versions import RoomVersion -from synapse.types import JsonDict -from synapse.util.frozenutils import unfreeze +from synapse.types import JsonDict, Requester from . import EventBase @@ -317,8 +316,9 @@ class SerializeEventConfig: as_client_event: bool = True # Function to convert from federation format to client format event_format: Callable[[JsonDict], JsonDict] = format_event_for_client_v1 - # ID of the user's auth token - used for namespacing of transaction IDs - token_id: Optional[int] = None + # The entity that requested the event. This is used to determine whether to include + # the transaction_id in the unsigned section of the event. + requester: Optional[Requester] = None # List of event fields to include. If empty, all fields will be returned. only_event_fields: Optional[List[str]] = None # Some events can have stripped room state stored in the `unsigned` field. @@ -368,11 +368,24 @@ def serialize_event( e.unsigned["redacted_because"], time_now_ms, config=config ) - if config.token_id is not None: - if config.token_id == getattr(e.internal_metadata, "token_id", None): - txn_id = getattr(e.internal_metadata, "txn_id", None) - if txn_id is not None: - d["unsigned"]["transaction_id"] = txn_id + # If we have a txn_id saved in the internal_metadata, we should include it in the + # unsigned section of the event if it was sent by the same session as the one + # requesting the event. + # There is a special case for guests, because they only have one access token + # without associated access_token_id, so we always include the txn_id for events + # they sent. + txn_id = getattr(e.internal_metadata, "txn_id", None) + if txn_id is not None and config.requester is not None: + event_token_id = getattr(e.internal_metadata, "token_id", None) + if config.requester.user.to_string() == e.sender and ( + ( + event_token_id is not None + and config.requester.access_token_id is not None + and event_token_id == config.requester.access_token_id + ) + or config.requester.is_guest + ): + d["unsigned"]["transaction_id"] = txn_id # invite_room_state and knock_room_state are a list of stripped room state events # that are meant to provide metadata about a room to an invitee/knocker. They are @@ -403,14 +416,6 @@ class EventClientSerializer: clients. """ - def __init__(self, inhibit_replacement_via_edits: bool = False): - """ - Args: - inhibit_replacement_via_edits: If this is set to True, then events are - never replaced by their edits. - """ - self._inhibit_replacement_via_edits = inhibit_replacement_via_edits - def serialize_event( self, event: Union[JsonDict, EventBase], @@ -418,7 +423,6 @@ class EventClientSerializer: *, config: SerializeEventConfig = _DEFAULT_SERIALIZE_EVENT_CONFIG, bundle_aggregations: Optional[Dict[str, "BundledAggregations"]] = None, - apply_edits: bool = True, ) -> JsonDict: """Serializes a single event. @@ -428,10 +432,7 @@ class EventClientSerializer: config: Event serialization config bundle_aggregations: A map from event_id to the aggregations to be bundled into the event. - apply_edits: Whether the content of the event should be modified to reflect - any replacement in `bundle_aggregations[<event_id>].replace`. - See also the `inhibit_replacement_via_edits` constructor arg: if that is - set to True, then this argument is ignored. + Returns: The serialized event """ @@ -450,38 +451,10 @@ class EventClientSerializer: config, bundle_aggregations, serialized_event, - apply_edits=apply_edits, ) return serialized_event - def _apply_edit( - self, orig_event: EventBase, serialized_event: JsonDict, edit: EventBase - ) -> None: - """Replace the content, preserving existing relations of the serialized event. - - Args: - orig_event: The original event. - serialized_event: The original event, serialized. This is modified. - edit: The event which edits the above. - """ - - # Ensure we take copies of the edit content, otherwise we risk modifying - # the original event. - edit_content = edit.content.copy() - - # Unfreeze the event content if necessary, so that we may modify it below - edit_content = unfreeze(edit_content) - serialized_event["content"] = edit_content.get("m.new_content", {}) - - # Check for existing relations - relates_to = orig_event.content.get("m.relates_to") - if relates_to: - # Keep the relations, ensuring we use a dict copy of the original - serialized_event["content"]["m.relates_to"] = relates_to.copy() - else: - serialized_event["content"].pop("m.relates_to", None) - def _inject_bundled_aggregations( self, event: EventBase, @@ -489,7 +462,6 @@ class EventClientSerializer: config: SerializeEventConfig, bundled_aggregations: Dict[str, "BundledAggregations"], serialized_event: JsonDict, - apply_edits: bool, ) -> None: """Potentially injects bundled aggregations into the unsigned portion of the serialized event. @@ -504,9 +476,6 @@ class EventClientSerializer: While serializing the bundled aggregations this map may be searched again for additional events in a recursive manner. serialized_event: The serialized event which may be modified. - apply_edits: Whether the content of the event should be modified to reflect - any replacement in `aggregations.replace` (subject to the - `inhibit_replacement_via_edits` constructor arg). """ # We have already checked that aggregations exist for this event. @@ -522,11 +491,6 @@ class EventClientSerializer: ] = event_aggregations.references if event_aggregations.replace: - # If there is an edit, optionally apply it to the event. - edit = event_aggregations.replace - if apply_edits and not self._inhibit_replacement_via_edits: - self._apply_edit(event, serialized_event, edit) - # Include information about it in the relations dict. # # Matrix spec v1.5 (https://spec.matrix.org/v1.5/client-server-api/#server-side-aggregation-of-mreplace-relationships) @@ -534,10 +498,7 @@ class EventClientSerializer: # `sender` of the edit; however MSC3925 proposes extending it to the whole # of the edit, which is what we do here. serialized_aggregations[RelationTypes.REPLACE] = self.serialize_event( - edit, - time_now, - config=config, - apply_edits=False, + event_aggregations.replace, time_now, config=config ) # Include any threaded replies to this event. diff --git a/synapse/handlers/event_auth.py b/synapse/handlers/event_auth.py index c508861b6a..0db0bd7304 100644 --- a/synapse/handlers/event_auth.py +++ b/synapse/handlers/event_auth.py @@ -63,9 +63,18 @@ class EventAuthHandler: self._store, event, batched_auth_events ) auth_event_ids = event.auth_event_ids() - auth_events_by_id = await self._store.get_events(auth_event_ids) + if batched_auth_events: - auth_events_by_id.update(batched_auth_events) + # Copy the batched auth events to avoid mutating them. + auth_events_by_id = dict(batched_auth_events) + needed_auth_event_ids = set(auth_event_ids) - set(batched_auth_events) + if needed_auth_event_ids: + auth_events_by_id.update( + await self._store.get_events(needed_auth_event_ids) + ) + else: + auth_events_by_id = await self._store.get_events(auth_event_ids) + check_state_dependent_auth_rules(event, auth_events_by_id.values()) def compute_auth_events( diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 949b69cb41..68c07f0265 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -23,7 +23,7 @@ from synapse.events.utils import SerializeEventConfig from synapse.handlers.presence import format_user_presence_state from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.streams.config import PaginationConfig -from synapse.types import JsonDict, UserID +from synapse.types import JsonDict, Requester, UserID from synapse.visibility import filter_events_for_client if TYPE_CHECKING: @@ -46,13 +46,12 @@ class EventStreamHandler: async def get_stream( self, - auth_user_id: str, + requester: Requester, pagin_config: PaginationConfig, timeout: int = 0, as_client_event: bool = True, affect_presence: bool = True, room_id: Optional[str] = None, - is_guest: bool = False, ) -> JsonDict: """Fetches the events stream for a given user.""" @@ -62,13 +61,12 @@ class EventStreamHandler: raise SynapseError(403, "This room has been blocked on this server") # send any outstanding server notices to the user. - await self._server_notices_sender.on_user_syncing(auth_user_id) + await self._server_notices_sender.on_user_syncing(requester.user.to_string()) - auth_user = UserID.from_string(auth_user_id) presence_handler = self.hs.get_presence_handler() context = await presence_handler.user_syncing( - auth_user_id, + requester.user.to_string(), affect_presence=affect_presence, presence_state=PresenceState.ONLINE, ) @@ -82,10 +80,10 @@ class EventStreamHandler: timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1)) stream_result = await self.notifier.get_events_for( - auth_user, + requester.user, pagin_config, timeout, - is_guest=is_guest, + is_guest=requester.is_guest, explicit_room_id=room_id, ) events = stream_result.events @@ -102,7 +100,7 @@ class EventStreamHandler: if event.membership != Membership.JOIN: continue # Send down presence. - if event.state_key == auth_user_id: + if event.state_key == requester.user.to_string(): # Send down presence for everyone in the room. users: Iterable[str] = await self.store.get_users_in_room( event.room_id @@ -124,7 +122,9 @@ class EventStreamHandler: chunks = self._event_serializer.serialize_events( events, time_now, - config=SerializeEventConfig(as_client_event=as_client_event), + config=SerializeEventConfig( + as_client_event=as_client_event, requester=requester + ), ) chunk = { diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index aead0b44b9..b3be7a86f0 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -318,11 +318,9 @@ class InitialSyncHandler: ) is_peeking = member_event_id is None - user_id = requester.user.to_string() - if membership == Membership.JOIN: result = await self._room_initial_sync_joined( - user_id, room_id, pagin_config, membership, is_peeking + requester, room_id, pagin_config, membership, is_peeking ) elif membership == Membership.LEAVE: # The member_event_id will always be available if membership is set @@ -330,10 +328,16 @@ class InitialSyncHandler: assert member_event_id result = await self._room_initial_sync_parted( - user_id, room_id, pagin_config, membership, member_event_id, is_peeking + requester, + room_id, + pagin_config, + membership, + member_event_id, + is_peeking, ) account_data_events = [] + user_id = requester.user.to_string() tags = await self.store.get_tags_for_room(user_id, room_id) if tags: account_data_events.append( @@ -350,7 +354,7 @@ class InitialSyncHandler: async def _room_initial_sync_parted( self, - user_id: str, + requester: Requester, room_id: str, pagin_config: PaginationConfig, membership: str, @@ -369,13 +373,17 @@ class InitialSyncHandler: ) messages = await filter_events_for_client( - self._storage_controllers, user_id, messages, is_peeking=is_peeking + self._storage_controllers, + requester.user.to_string(), + messages, + is_peeking=is_peeking, ) start_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, token) end_token = StreamToken.START.copy_and_replace(StreamKeyType.ROOM, stream_token) time_now = self.clock.time_msec() + serialize_options = SerializeEventConfig(requester=requester) return { "membership": membership, @@ -383,14 +391,18 @@ class InitialSyncHandler: "messages": { "chunk": ( # Don't bundle aggregations as this is a deprecated API. - self._event_serializer.serialize_events(messages, time_now) + self._event_serializer.serialize_events( + messages, time_now, config=serialize_options + ) ), "start": await start_token.to_string(self.store), "end": await end_token.to_string(self.store), }, "state": ( # Don't bundle aggregations as this is a deprecated API. - self._event_serializer.serialize_events(room_state.values(), time_now) + self._event_serializer.serialize_events( + room_state.values(), time_now, config=serialize_options + ) ), "presence": [], "receipts": [], @@ -398,7 +410,7 @@ class InitialSyncHandler: async def _room_initial_sync_joined( self, - user_id: str, + requester: Requester, room_id: str, pagin_config: PaginationConfig, membership: str, @@ -410,9 +422,12 @@ class InitialSyncHandler: # TODO: These concurrently time_now = self.clock.time_msec() + serialize_options = SerializeEventConfig(requester=requester) # Don't bundle aggregations as this is a deprecated API. state = self._event_serializer.serialize_events( - current_state.values(), time_now + current_state.values(), + time_now, + config=serialize_options, ) now_token = self.hs.get_event_sources().get_current_token() @@ -450,7 +465,10 @@ class InitialSyncHandler: if not receipts: return [] - return ReceiptEventSource.filter_out_private_receipts(receipts, user_id) + return ReceiptEventSource.filter_out_private_receipts( + receipts, + requester.user.to_string(), + ) presence, receipts, (messages, token) = await make_deferred_yieldable( gather_results( @@ -469,20 +487,23 @@ class InitialSyncHandler: ) messages = await filter_events_for_client( - self._storage_controllers, user_id, messages, is_peeking=is_peeking + self._storage_controllers, + requester.user.to_string(), + messages, + is_peeking=is_peeking, ) start_token = now_token.copy_and_replace(StreamKeyType.ROOM, token) end_token = now_token - time_now = self.clock.time_msec() - ret = { "room_id": room_id, "messages": { "chunk": ( # Don't bundle aggregations as this is a deprecated API. - self._event_serializer.serialize_events(messages, time_now) + self._event_serializer.serialize_events( + messages, time_now, config=serialize_options + ) ), "start": await start_token.to_string(self.store), "end": await end_token.to_string(self.store), diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index e433d6b01f..da129ec16a 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -50,7 +50,7 @@ from synapse.event_auth import validate_event_for_room_version from synapse.events import EventBase, relation_from_event from synapse.events.builder import EventBuilder from synapse.events.snapshot import EventContext, UnpersistedEventContextBase -from synapse.events.utils import maybe_upsert_event_field +from synapse.events.utils import SerializeEventConfig, maybe_upsert_event_field from synapse.events.validator import EventValidator from synapse.handlers.directory import DirectoryHandler from synapse.logging import opentracing @@ -245,8 +245,11 @@ class MessageHandler: ) room_state = room_state_events[membership_event_id] - now = self.clock.time_msec() - events = self._event_serializer.serialize_events(room_state.values(), now) + events = self._event_serializer.serialize_events( + room_state.values(), + self.clock.time_msec(), + config=SerializeEventConfig(requester=requester), + ) return events async def _user_can_see_state_at_event( diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index ceefa16b49..8c79c055ba 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -579,7 +579,9 @@ class PaginationHandler: time_now = self.clock.time_msec() - serialize_options = SerializeEventConfig(as_client_event=as_client_event) + serialize_options = SerializeEventConfig( + as_client_event=as_client_event, requester=requester + ) chunk = { "chunk": ( diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 553053b694..1d09fdf135 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -20,6 +20,7 @@ import attr from synapse.api.constants import Direction, EventTypes, RelationTypes from synapse.api.errors import SynapseError from synapse.events import EventBase, relation_from_event +from synapse.events.utils import SerializeEventConfig from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.opentracing import trace from synapse.storage.databases.main.relations import ThreadsNextBatch, _RelatedEvent @@ -151,16 +152,23 @@ class RelationsHandler: ) now = self._clock.time_msec() + serialize_options = SerializeEventConfig(requester=requester) return_value: JsonDict = { "chunk": self._event_serializer.serialize_events( - events, now, bundle_aggregations=aggregations + events, + now, + bundle_aggregations=aggregations, + config=serialize_options, ), } if include_original_event: # Do not bundle aggregations when retrieving the original event because # we want the content before relations are applied to it. return_value["original_event"] = self._event_serializer.serialize_event( - event, now, bundle_aggregations=None + event, + now, + bundle_aggregations=None, + config=serialize_options, ) if next_token: diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index b1784638f4..32451670f3 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1123,7 +1123,9 @@ class RoomCreationHandler: event_dict, prev_event_ids=prev_event, depth=depth, - state_map=state_map, + # Take a copy to ensure each event gets a unique copy of + # state_map since it is modified below. + state_map=dict(state_map), for_batch=for_batch, ) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 9bbf83047d..aad4706f14 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -23,7 +23,8 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.errors import NotFoundError, SynapseError from synapse.api.filtering import Filter from synapse.events import EventBase -from synapse.types import JsonDict, StrCollection, StreamKeyType, UserID +from synapse.events.utils import SerializeEventConfig +from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType, UserID from synapse.types.state import StateFilter from synapse.visibility import filter_events_for_client @@ -109,12 +110,12 @@ class SearchHandler: return historical_room_ids async def search( - self, user: UserID, content: JsonDict, batch: Optional[str] = None + self, requester: Requester, content: JsonDict, batch: Optional[str] = None ) -> JsonDict: """Performs a full text search for a user. Args: - user: The user performing the search. + requester: The user performing the search. content: Search parameters batch: The next_batch parameter. Used for pagination. @@ -199,7 +200,7 @@ class SearchHandler: ) return await self._search( - user, + requester, batch_group, batch_group_key, batch_token, @@ -217,7 +218,7 @@ class SearchHandler: async def _search( self, - user: UserID, + requester: Requester, batch_group: Optional[str], batch_group_key: Optional[str], batch_token: Optional[str], @@ -235,7 +236,7 @@ class SearchHandler: """Performs a full text search for a user. Args: - user: The user performing the search. + requester: The user performing the search. batch_group: Pagination information. batch_group_key: Pagination information. batch_token: Pagination information. @@ -269,7 +270,7 @@ class SearchHandler: # TODO: Search through left rooms too rooms = await self.store.get_rooms_for_local_user_where_membership_is( - user.to_string(), + requester.user.to_string(), membership_list=[Membership.JOIN], # membership_list=[Membership.JOIN, Membership.LEAVE, Membership.Ban], ) @@ -303,13 +304,13 @@ class SearchHandler: if order_by == "rank": search_result, sender_group = await self._search_by_rank( - user, room_ids, search_term, keys, search_filter + requester.user, room_ids, search_term, keys, search_filter ) # Unused return values for rank search. global_next_batch = None elif order_by == "recent": search_result, global_next_batch = await self._search_by_recent( - user, + requester.user, room_ids, search_term, keys, @@ -334,7 +335,7 @@ class SearchHandler: assert after_limit is not None contexts = await self._calculate_event_contexts( - user, + requester.user, search_result.allowed_events, before_limit, after_limit, @@ -363,27 +364,37 @@ class SearchHandler: # The returned events. search_result.allowed_events, ), - user.to_string(), + requester.user.to_string(), ) # We're now about to serialize the events. We should not make any # blocking calls after this. Otherwise, the 'age' will be wrong. time_now = self.clock.time_msec() + serialize_options = SerializeEventConfig(requester=requester) for context in contexts.values(): context["events_before"] = self._event_serializer.serialize_events( - context["events_before"], time_now, bundle_aggregations=aggregations + context["events_before"], + time_now, + bundle_aggregations=aggregations, + config=serialize_options, ) context["events_after"] = self._event_serializer.serialize_events( - context["events_after"], time_now, bundle_aggregations=aggregations + context["events_after"], + time_now, + bundle_aggregations=aggregations, + config=serialize_options, ) results = [ { "rank": search_result.rank_map[e.event_id], "result": self._event_serializer.serialize_event( - e, time_now, bundle_aggregations=aggregations + e, + time_now, + bundle_aggregations=aggregations, + config=serialize_options, ), "context": contexts.get(e.event_id, {}), } @@ -398,7 +409,9 @@ class SearchHandler: if state_results: rooms_cat_res["state"] = { - room_id: self._event_serializer.serialize_events(state_events, time_now) + room_id: self._event_serializer.serialize_events( + state_events, time_now, config=serialize_options + ) for room_id, state_events in state_results.items() } diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index abcf687f05..199337673f 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -273,10 +273,7 @@ class BulkPushRuleEvaluator: related_event_id, allow_none=True ) if related_event is not None: - related_events[relation_type] = _flatten_dict( - related_event, - msc3873_escape_event_match_key=self.hs.config.experimental.msc3873_escape_event_match_key, - ) + related_events[relation_type] = _flatten_dict(related_event) reply_event_id = ( event.content.get("m.relates_to", {}) @@ -291,10 +288,7 @@ class BulkPushRuleEvaluator: ) if related_event is not None: - related_events["m.in_reply_to"] = _flatten_dict( - related_event, - msc3873_escape_event_match_key=self.hs.config.experimental.msc3873_escape_event_match_key, - ) + related_events["m.in_reply_to"] = _flatten_dict(related_event) # indicate that this is from a fallback relation. if relation_type == "m.thread" and event.content.get( @@ -401,10 +395,7 @@ class BulkPushRuleEvaluator: ) evaluator = PushRuleEvaluator( - _flatten_dict( - event, - msc3873_escape_event_match_key=self.hs.config.experimental.msc3873_escape_event_match_key, - ), + _flatten_dict(event), has_mentions, room_member_count, sender_power_level, @@ -413,8 +404,6 @@ class BulkPushRuleEvaluator: self._related_event_match_enabled, event.room_version.msc3931_push_features, self.hs.config.experimental.msc1767_enabled, # MSC3931 flag - self.hs.config.experimental.msc3758_exact_event_match, - self.hs.config.experimental.msc3966_exact_event_property_contains, ) users = rules_by_user.keys() @@ -496,8 +485,6 @@ def _flatten_dict( d: Union[EventBase, Mapping[str, Any]], prefix: Optional[List[str]] = None, result: Optional[Dict[str, JsonValue]] = None, - *, - msc3873_escape_event_match_key: bool = False, ) -> Dict[str, JsonValue]: """ Given a JSON dictionary (or event) which might contain sub dictionaries, @@ -526,11 +513,10 @@ def _flatten_dict( if result is None: result = {} for key, value in d.items(): - if msc3873_escape_event_match_key: - # Escape periods in the key with a backslash (and backslashes with an - # extra backslash). This is since a period is used as a separator between - # nested fields. - key = key.replace("\\", "\\\\").replace(".", "\\.") + # Escape periods in the key with a backslash (and backslashes with an + # extra backslash). This is since a period is used as a separator between + # nested fields. + key = key.replace("\\", "\\\\").replace(".", "\\.") if _is_simple_value(value): result[".".join(prefix + [key])] = value @@ -538,12 +524,7 @@ def _flatten_dict( result[".".join(prefix + [key])] = [v for v in value if _is_simple_value(v)] elif isinstance(value, Mapping): # do not set `room_version` due to recursion considerations below - _flatten_dict( - value, - prefix=(prefix + [key]), - result=result, - msc3873_escape_event_match_key=msc3873_escape_event_match_key, - ) + _flatten_dict(value, prefix=(prefix + [key]), result=result) # `room_version` should only ever be set when looking at the top level of an event if ( diff --git a/synapse/rest/admin/server_notice_servlet.py b/synapse/rest/admin/server_notice_servlet.py index 15da9cd881..7dd1c10b91 100644 --- a/synapse/rest/admin/server_notice_servlet.py +++ b/synapse/rest/admin/server_notice_servlet.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. from http import HTTPStatus -from typing import TYPE_CHECKING, Awaitable, Optional, Tuple +from typing import TYPE_CHECKING, Optional, Tuple from synapse.api.constants import EventTypes from synapse.api.errors import NotFoundError, SynapseError @@ -23,10 +23,10 @@ from synapse.http.servlet import ( parse_json_object_from_request, ) from synapse.http.site import SynapseRequest -from synapse.rest.admin import assert_requester_is_admin -from synapse.rest.admin._base import admin_patterns +from synapse.logging.opentracing import set_tag +from synapse.rest.admin._base import admin_patterns, assert_user_is_admin from synapse.rest.client.transactions import HttpTransactionCache -from synapse.types import JsonDict, UserID +from synapse.types import JsonDict, Requester, UserID if TYPE_CHECKING: from synapse.server import HomeServer @@ -70,10 +70,13 @@ class SendServerNoticeServlet(RestServlet): self.__class__.__name__, ) - async def on_POST( - self, request: SynapseRequest, txn_id: Optional[str] = None + async def _do( + self, + request: SynapseRequest, + requester: Requester, + txn_id: Optional[str], ) -> Tuple[int, JsonDict]: - await assert_requester_is_admin(self.auth, request) + await assert_user_is_admin(self.auth, requester) body = parse_json_object_from_request(request) assert_params_in_dict(body, ("user_id", "content")) event_type = body.get("type", EventTypes.Message) @@ -106,9 +109,18 @@ class SendServerNoticeServlet(RestServlet): return HTTPStatus.OK, {"event_id": event.event_id} - def on_PUT( + async def on_POST( + self, + request: SynapseRequest, + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) + return await self._do(request, requester, None) + + async def on_PUT( self, request: SynapseRequest, txn_id: str - ) -> Awaitable[Tuple[int, JsonDict]]: - return self.txns.fetch_or_execute_request( - request, self.on_POST, request, txn_id + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) + set_tag("txn_id", txn_id) + return await self.txns.fetch_or_execute_request( + request, requester, self._do, request, requester, txn_id ) diff --git a/synapse/rest/client/events.py b/synapse/rest/client/events.py index 782e7d14e8..694d77d287 100644 --- a/synapse/rest/client/events.py +++ b/synapse/rest/client/events.py @@ -17,6 +17,7 @@ import logging from typing import TYPE_CHECKING, Dict, List, Tuple, Union from synapse.api.errors import SynapseError +from synapse.events.utils import SerializeEventConfig from synapse.http.server import HttpServer from synapse.http.servlet import RestServlet, parse_string from synapse.http.site import SynapseRequest @@ -43,9 +44,8 @@ class EventStreamRestServlet(RestServlet): async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request, allow_guest=True) - is_guest = requester.is_guest args: Dict[bytes, List[bytes]] = request.args # type: ignore - if is_guest: + if requester.is_guest: if b"room_id" not in args: raise SynapseError(400, "Guest users must specify room_id param") room_id = parse_string(request, "room_id") @@ -63,13 +63,12 @@ class EventStreamRestServlet(RestServlet): as_client_event = b"raw" not in args chunk = await self.event_stream_handler.get_stream( - requester.user.to_string(), + requester, pagin_config, timeout=timeout, as_client_event=as_client_event, - affect_presence=(not is_guest), + affect_presence=(not requester.is_guest), room_id=room_id, - is_guest=is_guest, ) return 200, chunk @@ -91,9 +90,12 @@ class EventRestServlet(RestServlet): requester = await self.auth.get_user_by_req(request) event = await self.event_handler.get_event(requester.user, None, event_id) - time_now = self.clock.time_msec() if event: - result = self._event_serializer.serialize_event(event, time_now) + result = self._event_serializer.serialize_event( + event, + self.clock.time_msec(), + config=SerializeEventConfig(requester=requester), + ) return 200, result else: return 404, "Event not found." diff --git a/synapse/rest/client/notifications.py b/synapse/rest/client/notifications.py index 61268e3af1..ea10042569 100644 --- a/synapse/rest/client/notifications.py +++ b/synapse/rest/client/notifications.py @@ -72,6 +72,12 @@ class NotificationsServlet(RestServlet): next_token = None + serialize_options = SerializeEventConfig( + event_format=format_event_for_client_v2_without_room_id, + requester=requester, + ) + now = self.clock.time_msec() + for pa in push_actions: returned_pa = { "room_id": pa.room_id, @@ -81,10 +87,8 @@ class NotificationsServlet(RestServlet): "event": ( self._event_serializer.serialize_event( notif_events[pa.event_id], - self.clock.time_msec(), - config=SerializeEventConfig( - event_format=format_event_for_client_v2_without_room_id - ), + now, + config=serialize_options, ) ), } diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 45aee3d3fe..129b6fe6b0 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -37,7 +37,7 @@ from synapse.api.errors import ( UnredactedContentDeletedError, ) from synapse.api.filtering import Filter -from synapse.events.utils import format_event_for_client_v2 +from synapse.events.utils import SerializeEventConfig, format_event_for_client_v2 from synapse.http.server import HttpServer from synapse.http.servlet import ( ResolveRoomIdMixin, @@ -57,7 +57,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.rest.client._base import client_patterns from synapse.rest.client.transactions import HttpTransactionCache from synapse.streams.config import PaginationConfig -from synapse.types import JsonDict, StreamToken, ThirdPartyInstanceID, UserID +from synapse.types import JsonDict, Requester, StreamToken, ThirdPartyInstanceID, UserID from synapse.types.state import StateFilter from synapse.util import json_decoder from synapse.util.cancellation import cancellable @@ -151,15 +151,22 @@ class RoomCreateRestServlet(TransactionRestServlet): PATTERNS = "/createRoom" register_txn_path(self, PATTERNS, http_server) - def on_PUT( + async def on_PUT( self, request: SynapseRequest, txn_id: str - ) -> Awaitable[Tuple[int, JsonDict]]: + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) set_tag("txn_id", txn_id) - return self.txns.fetch_or_execute_request(request, self.on_POST, request) + return await self.txns.fetch_or_execute_request( + request, requester, self._do, request, requester + ) async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) + return await self._do(request, requester) + async def _do( + self, request: SynapseRequest, requester: Requester + ) -> Tuple[int, JsonDict]: room_id, _, _ = await self._room_creation_handler.create_room( requester, self.get_room_config(request) ) @@ -172,9 +179,9 @@ class RoomCreateRestServlet(TransactionRestServlet): # TODO: Needs unit testing for generic events -class RoomStateEventRestServlet(TransactionRestServlet): +class RoomStateEventRestServlet(RestServlet): def __init__(self, hs: "HomeServer"): - super().__init__(hs) + super().__init__() self.event_creation_handler = hs.get_event_creation_handler() self.room_member_handler = hs.get_room_member_handler() self.message_handler = hs.get_message_handler() @@ -324,16 +331,16 @@ class RoomSendEventRestServlet(TransactionRestServlet): def register(self, http_server: HttpServer) -> None: # /rooms/$roomid/send/$event_type[/$txn_id] PATTERNS = "/rooms/(?P<room_id>[^/]*)/send/(?P<event_type>[^/]*)" - register_txn_path(self, PATTERNS, http_server, with_get=True) + register_txn_path(self, PATTERNS, http_server) - async def on_POST( + async def _do( self, request: SynapseRequest, + requester: Requester, room_id: str, event_type: str, - txn_id: Optional[str] = None, + txn_id: Optional[str], ) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request, allow_guest=True) content = parse_json_object_from_request(request) event_dict: JsonDict = { @@ -362,18 +369,30 @@ class RoomSendEventRestServlet(TransactionRestServlet): set_tag("event_id", event_id) return 200, {"event_id": event_id} - def on_GET( - self, request: SynapseRequest, room_id: str, event_type: str, txn_id: str - ) -> Tuple[int, str]: - return 200, "Not implemented" + async def on_POST( + self, + request: SynapseRequest, + room_id: str, + event_type: str, + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request, allow_guest=True) + return await self._do(request, requester, room_id, event_type, None) - def on_PUT( + async def on_PUT( self, request: SynapseRequest, room_id: str, event_type: str, txn_id: str - ) -> Awaitable[Tuple[int, JsonDict]]: + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request, allow_guest=True) set_tag("txn_id", txn_id) - return self.txns.fetch_or_execute_request( - request, self.on_POST, request, room_id, event_type, txn_id + return await self.txns.fetch_or_execute_request( + request, + requester, + self._do, + request, + requester, + room_id, + event_type, + txn_id, ) @@ -389,14 +408,13 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, TransactionRestServlet): PATTERNS = "/join/(?P<room_identifier>[^/]*)" register_txn_path(self, PATTERNS, http_server) - async def on_POST( + async def _do( self, request: SynapseRequest, + requester: Requester, room_identifier: str, - txn_id: Optional[str] = None, + txn_id: Optional[str], ) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request, allow_guest=True) - content = parse_json_object_from_request(request, allow_empty_body=True) # twisted.web.server.Request.args is incorrectly defined as Optional[Any] @@ -420,22 +438,31 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, TransactionRestServlet): return 200, {"room_id": room_id} - def on_PUT( + async def on_POST( + self, + request: SynapseRequest, + room_identifier: str, + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request, allow_guest=True) + return await self._do(request, requester, room_identifier, None) + + async def on_PUT( self, request: SynapseRequest, room_identifier: str, txn_id: str - ) -> Awaitable[Tuple[int, JsonDict]]: + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request, allow_guest=True) set_tag("txn_id", txn_id) - return self.txns.fetch_or_execute_request( - request, self.on_POST, request, room_identifier, txn_id + return await self.txns.fetch_or_execute_request( + request, requester, self._do, request, requester, room_identifier, txn_id ) # TODO: Needs unit testing -class PublicRoomListRestServlet(TransactionRestServlet): +class PublicRoomListRestServlet(RestServlet): PATTERNS = client_patterns("/publicRooms$", v1=True) def __init__(self, hs: "HomeServer"): - super().__init__(hs) + super().__init__() self.hs = hs self.auth = hs.get_auth() @@ -814,11 +841,13 @@ class RoomEventServlet(RestServlet): [event], requester.user.to_string() ) - time_now = self.clock.time_msec() # per MSC2676, /rooms/{roomId}/event/{eventId}, should return the # *original* event, rather than the edited version event_dict = self._event_serializer.serialize_event( - event, time_now, bundle_aggregations=aggregations, apply_edits=False + event, + self.clock.time_msec(), + bundle_aggregations=aggregations, + config=SerializeEventConfig(requester=requester), ) return 200, event_dict @@ -863,24 +892,30 @@ class RoomEventContextServlet(RestServlet): raise SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND) time_now = self.clock.time_msec() + serializer_options = SerializeEventConfig(requester=requester) results = { "events_before": self._event_serializer.serialize_events( event_context.events_before, time_now, bundle_aggregations=event_context.aggregations, + config=serializer_options, ), "event": self._event_serializer.serialize_event( event_context.event, time_now, bundle_aggregations=event_context.aggregations, + config=serializer_options, ), "events_after": self._event_serializer.serialize_events( event_context.events_after, time_now, bundle_aggregations=event_context.aggregations, + config=serializer_options, ), "state": self._event_serializer.serialize_events( - event_context.state, time_now + event_context.state, + time_now, + config=serializer_options, ), "start": event_context.start, "end": event_context.end, @@ -899,22 +934,25 @@ class RoomForgetRestServlet(TransactionRestServlet): PATTERNS = "/rooms/(?P<room_id>[^/]*)/forget" register_txn_path(self, PATTERNS, http_server) - async def on_POST( - self, request: SynapseRequest, room_id: str, txn_id: Optional[str] = None - ) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request, allow_guest=False) - + async def _do(self, requester: Requester, room_id: str) -> Tuple[int, JsonDict]: await self.room_member_handler.forget(user=requester.user, room_id=room_id) return 200, {} - def on_PUT( + async def on_POST( + self, request: SynapseRequest, room_id: str + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request, allow_guest=False) + return await self._do(requester, room_id) + + async def on_PUT( self, request: SynapseRequest, room_id: str, txn_id: str - ) -> Awaitable[Tuple[int, JsonDict]]: + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request, allow_guest=False) set_tag("txn_id", txn_id) - return self.txns.fetch_or_execute_request( - request, self.on_POST, request, room_id, txn_id + return await self.txns.fetch_or_execute_request( + request, requester, self._do, requester, room_id ) @@ -933,15 +971,14 @@ class RoomMembershipRestServlet(TransactionRestServlet): ) register_txn_path(self, PATTERNS, http_server) - async def on_POST( + async def _do( self, request: SynapseRequest, + requester: Requester, room_id: str, membership_action: str, - txn_id: Optional[str] = None, + txn_id: Optional[str], ) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request, allow_guest=True) - if requester.is_guest and membership_action not in { Membership.JOIN, Membership.LEAVE, @@ -1006,13 +1043,30 @@ class RoomMembershipRestServlet(TransactionRestServlet): return 200, return_value - def on_PUT( + async def on_POST( + self, + request: SynapseRequest, + room_id: str, + membership_action: str, + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request, allow_guest=True) + return await self._do(request, requester, room_id, membership_action, None) + + async def on_PUT( self, request: SynapseRequest, room_id: str, membership_action: str, txn_id: str - ) -> Awaitable[Tuple[int, JsonDict]]: + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request, allow_guest=True) set_tag("txn_id", txn_id) - return self.txns.fetch_or_execute_request( - request, self.on_POST, request, room_id, membership_action, txn_id + return await self.txns.fetch_or_execute_request( + request, + requester, + self._do, + request, + requester, + room_id, + membership_action, + txn_id, ) @@ -1028,14 +1082,14 @@ class RoomRedactEventRestServlet(TransactionRestServlet): PATTERNS = "/rooms/(?P<room_id>[^/]*)/redact/(?P<event_id>[^/]*)" register_txn_path(self, PATTERNS, http_server) - async def on_POST( + async def _do( self, request: SynapseRequest, + requester: Requester, room_id: str, event_id: str, - txn_id: Optional[str] = None, + txn_id: Optional[str], ) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request) content = parse_json_object_from_request(request) try: @@ -1086,13 +1140,23 @@ class RoomRedactEventRestServlet(TransactionRestServlet): set_tag("event_id", event_id) return 200, {"event_id": event_id} - def on_PUT( + async def on_POST( + self, + request: SynapseRequest, + room_id: str, + event_id: str, + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) + return await self._do(request, requester, room_id, event_id, None) + + async def on_PUT( self, request: SynapseRequest, room_id: str, event_id: str, txn_id: str - ) -> Awaitable[Tuple[int, JsonDict]]: + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) set_tag("txn_id", txn_id) - return self.txns.fetch_or_execute_request( - request, self.on_POST, request, room_id, event_id, txn_id + return await self.txns.fetch_or_execute_request( + request, requester, self._do, request, requester, room_id, event_id, txn_id ) @@ -1192,7 +1256,7 @@ class SearchRestServlet(RestServlet): content = parse_json_object_from_request(request) batch = parse_string(request, "next_batch") - results = await self.search_handler.search(requester.user, content, batch) + results = await self.search_handler.search(requester, content, batch) return 200, results @@ -1216,7 +1280,6 @@ def register_txn_path( servlet: RestServlet, regex_string: str, http_server: HttpServer, - with_get: bool = False, ) -> None: """Registers a transaction-based path. @@ -1228,7 +1291,6 @@ def register_txn_path( regex_string: The regex string to register. Must NOT have a trailing $ as this string will be appended to. http_server: The http_server to register paths with. - with_get: True to also register respective GET paths for the PUTs. """ on_POST = getattr(servlet, "on_POST", None) on_PUT = getattr(servlet, "on_PUT", None) @@ -1246,18 +1308,6 @@ def register_txn_path( on_PUT, servlet.__class__.__name__, ) - on_GET = getattr(servlet, "on_GET", None) - if with_get: - if on_GET is None: - raise RuntimeError( - "register_txn_path called with with_get = True, but no on_GET method exists" - ) - http_server.register_paths( - "GET", - client_patterns(regex_string + "/(?P<txn_id>[^/]*)$", v1=True), - on_GET, - servlet.__class__.__name__, - ) class TimestampLookupRestServlet(RestServlet): diff --git a/synapse/rest/client/sendtodevice.py b/synapse/rest/client/sendtodevice.py index 55d52f0b28..110af6df47 100644 --- a/synapse/rest/client/sendtodevice.py +++ b/synapse/rest/client/sendtodevice.py @@ -13,7 +13,7 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, Awaitable, Tuple +from typing import TYPE_CHECKING, Tuple from synapse.http import servlet from synapse.http.server import HttpServer @@ -21,7 +21,7 @@ from synapse.http.servlet import assert_params_in_dict, parse_json_object_from_r from synapse.http.site import SynapseRequest from synapse.logging.opentracing import set_tag from synapse.rest.client.transactions import HttpTransactionCache -from synapse.types import JsonDict +from synapse.types import JsonDict, Requester from ._base import client_patterns @@ -43,19 +43,26 @@ class SendToDeviceRestServlet(servlet.RestServlet): self.txns = HttpTransactionCache(hs) self.device_message_handler = hs.get_device_message_handler() - def on_PUT( + async def on_PUT( self, request: SynapseRequest, message_type: str, txn_id: str - ) -> Awaitable[Tuple[int, JsonDict]]: + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request, allow_guest=True) set_tag("txn_id", txn_id) - return self.txns.fetch_or_execute_request( - request, self._put, request, message_type, txn_id + return await self.txns.fetch_or_execute_request( + request, + requester, + self._put, + request, + requester, + message_type, ) async def _put( - self, request: SynapseRequest, message_type: str, txn_id: str + self, + request: SynapseRequest, + requester: Requester, + message_type: str, ) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request, allow_guest=True) - content = parse_json_object_from_request(request) assert_params_in_dict(content, ("messages",)) diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index 8fcb8ac3d9..e578b26fa3 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -38,7 +38,7 @@ from synapse.http.server import HttpServer from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string from synapse.http.site import SynapseRequest from synapse.logging.opentracing import trace_with_opname -from synapse.types import JsonDict, StreamToken +from synapse.types import JsonDict, Requester, StreamToken from synapse.util import json_decoder from ._base import client_patterns, set_timeline_upper_limit @@ -226,7 +226,7 @@ class SyncRestServlet(RestServlet): # We know that the the requester has an access token since appservices # cannot use sync. response_content = await self.encode_response( - time_now, sync_result, requester.access_token_id, filter_collection + time_now, sync_result, requester, filter_collection ) logger.debug("Event formatting complete") @@ -237,7 +237,7 @@ class SyncRestServlet(RestServlet): self, time_now: int, sync_result: SyncResult, - access_token_id: Optional[int], + requester: Requester, filter: FilterCollection, ) -> JsonDict: logger.debug("Formatting events in sync response") @@ -250,12 +250,12 @@ class SyncRestServlet(RestServlet): serialize_options = SerializeEventConfig( event_format=event_formatter, - token_id=access_token_id, + requester=requester, only_event_fields=filter.event_fields, ) stripped_serialize_options = SerializeEventConfig( event_format=event_formatter, - token_id=access_token_id, + requester=requester, include_stripped_room_state=True, ) diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py index 3f40f1874a..f2aaab6227 100644 --- a/synapse/rest/client/transactions.py +++ b/synapse/rest/client/transactions.py @@ -15,16 +15,16 @@ """This module contains logic for storing HTTP PUT transactions. This is used to ensure idempotency when performing PUTs using the REST API.""" import logging -from typing import TYPE_CHECKING, Awaitable, Callable, Dict, Tuple +from typing import TYPE_CHECKING, Awaitable, Callable, Dict, Hashable, Tuple from typing_extensions import ParamSpec from twisted.internet.defer import Deferred from twisted.python.failure import Failure -from twisted.web.server import Request +from twisted.web.iweb import IRequest from synapse.logging.context import make_deferred_yieldable, run_in_background -from synapse.types import JsonDict +from synapse.types import JsonDict, Requester from synapse.util.async_helpers import ObservableDeferred if TYPE_CHECKING: @@ -41,53 +41,47 @@ P = ParamSpec("P") class HttpTransactionCache: def __init__(self, hs: "HomeServer"): self.hs = hs - self.auth = self.hs.get_auth() self.clock = self.hs.get_clock() # $txn_key: (ObservableDeferred<(res_code, res_json_body)>, timestamp) self.transactions: Dict[ - str, Tuple[ObservableDeferred[Tuple[int, JsonDict]], int] + Hashable, Tuple[ObservableDeferred[Tuple[int, JsonDict]], int] ] = {} # Try to clean entries every 30 mins. This means entries will exist # for at *LEAST* 30 mins, and at *MOST* 60 mins. self.cleaner = self.clock.looping_call(self._cleanup, CLEANUP_PERIOD_MS) - def _get_transaction_key(self, request: Request) -> str: + def _get_transaction_key(self, request: IRequest, requester: Requester) -> Hashable: """A helper function which returns a transaction key that can be used with TransactionCache for idempotent requests. Idempotency is based on the returned key being the same for separate requests to the same endpoint. The key is formed from the HTTP request - path and the access_token for the requesting user. + path and attributes from the requester: the access_token_id for regular users, + the user ID for guest users, and the appservice ID for appservice users. Args: - request: The incoming request. Must contain an access_token. + request: The incoming request. + requester: The requester doing the request. Returns: A transaction key """ assert request.path is not None - token = self.auth.get_access_token_from_request(request) - return request.path.decode("utf8") + "/" + token + path: str = request.path.decode("utf8") + if requester.is_guest: + assert requester.user is not None, "Guest requester must have a user ID set" + return (path, "guest", requester.user) + elif requester.app_service is not None: + return (path, "appservice", requester.app_service.id) + else: + assert ( + requester.access_token_id is not None + ), "Requester must have an access_token_id" + return (path, "user", requester.access_token_id) def fetch_or_execute_request( self, - request: Request, - fn: Callable[P, Awaitable[Tuple[int, JsonDict]]], - *args: P.args, - **kwargs: P.kwargs, - ) -> Awaitable[Tuple[int, JsonDict]]: - """A helper function for fetch_or_execute which extracts - a transaction key from the given request. - - See: - fetch_or_execute - """ - return self.fetch_or_execute( - self._get_transaction_key(request), fn, *args, **kwargs - ) - - def fetch_or_execute( - self, - txn_key: str, + request: IRequest, + requester: Requester, fn: Callable[P, Awaitable[Tuple[int, JsonDict]]], *args: P.args, **kwargs: P.kwargs, @@ -96,14 +90,15 @@ class HttpTransactionCache: to produce a response for this transaction. Args: - txn_key: A key to ensure idempotency should fetch_or_execute be - called again at a later point in time. + request: + requester: fn: A function which returns a tuple of (response_code, response_dict). *args: Arguments to pass to fn. **kwargs: Keyword arguments to pass to fn. Returns: Deferred which resolves to a tuple of (response_code, response_dict). """ + txn_key = self._get_transaction_key(request, requester) if txn_key in self.transactions: observable = self.transactions[txn_key][0] else: diff --git a/synapse/server.py b/synapse/server.py index a7c32e9a60..df80fc1beb 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -743,7 +743,7 @@ class HomeServer(metaclass=abc.ABCMeta): @cache_in_self def get_event_client_serializer(self) -> EventClientSerializer: - return EventClientSerializer(self.config.experimental.msc3925_inhibit_edit) + return EventClientSerializer() @cache_in_self def get_password_policy_handler(self) -> PasswordPolicyHandler: diff --git a/synapse/storage/database.py b/synapse/storage/database.py index feaa6cdd07..5efe31aa19 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -672,7 +672,15 @@ class DatabasePool: f = cast(types.FunctionType, func) # type: ignore[redundant-cast] if f.__closure__: for i, cell in enumerate(f.__closure__): - if inspect.isgenerator(cell.cell_contents): + try: + contents = cell.cell_contents + except ValueError: + # cell.cell_contents can raise if the "cell" is empty, + # which indicates that the variable is currently + # unbound. + continue + + if inspect.isgenerator(contents): logger.error( "Programming error: function %s references generator %s " "via its closure", diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index b9c39b1718..a3b6c8ae8e 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -244,9 +244,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker set_tag("include_all_devices", include_all_devices) set_tag("include_deleted_devices", include_deleted_devices) - result = await self.db_pool.runInteraction( - "get_e2e_device_keys", - self._get_e2e_device_keys_txn, + result = await self._get_e2e_device_keys( query_list, include_all_devices, include_deleted_devices, @@ -285,9 +283,8 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker log_kv(result) return result - def _get_e2e_device_keys_txn( + async def _get_e2e_device_keys( self, - txn: LoggingTransaction, query_list: Collection[Tuple[str, Optional[str]]], include_all_devices: bool = False, include_deleted_devices: bool = False, @@ -319,7 +316,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker if user_list: user_id_in_list_clause, user_args = make_in_list_sql_clause( - txn.database_engine, "user_id", user_list + self.database_engine, "user_id", user_list ) query_clauses.append(user_id_in_list_clause) query_params_list.append(user_args) @@ -332,13 +329,16 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker user_device_id_in_list_clause, user_device_args, ) = make_tuple_in_list_sql_clause( - txn.database_engine, ("user_id", "device_id"), user_device_batch + self.database_engine, ("user_id", "device_id"), user_device_batch ) query_clauses.append(user_device_id_in_list_clause) query_params_list.append(user_device_args) result: Dict[str, Dict[str, Optional[DeviceKeyLookupResult]]] = {} - for query_clause, query_params in zip(query_clauses, query_params_list): + + def get_e2e_device_keys_txn( + txn: LoggingTransaction, query_clause: str, query_params: list + ) -> None: sql = ( "SELECT user_id, device_id, " " d.display_name, " @@ -361,6 +361,14 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker display_name, db_to_json(key_json) if key_json else None ) + for query_clause, query_params in zip(query_clauses, query_params_list): + await self.db_pool.runInteraction( + "_get_e2e_device_keys", + get_e2e_device_keys_txn, + query_clause, + query_params, + ) + if include_deleted_devices: for user_id, device_id in deleted_devices: if device_id is None: |