From 96f6293de51c2fcf530bb6ca3705cf596c19656f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 22 Jun 2021 04:02:53 -0500 Subject: Add endpoints for backfilling history (MSC2716) (#9247) Work on https://github.com/matrix-org/matrix-doc/pull/2716 --- synapse/handlers/message.py | 104 ++++++++++++++++++++++++++++++++++++++-- synapse/handlers/room_member.py | 90 ++++++++++++++++++++++++++++++++++ 2 files changed, 189 insertions(+), 5 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 4d2255bdf1..db12abd59d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -482,6 +482,9 @@ class EventCreationHandler: prev_event_ids: Optional[List[str]] = None, auth_event_ids: Optional[List[str]] = None, require_consent: bool = True, + outlier: bool = False, + historical: bool = False, + depth: Optional[int] = None, ) -> Tuple[EventBase, EventContext]: """ Given a dict from a client, create a new event. @@ -508,6 +511,14 @@ class EventCreationHandler: require_consent: Whether to check if the requester has consented to the privacy policy. + + outlier: Indicates whether the event is an `outlier`, i.e. if + it's from an arbitrary point and floating in the DAG as + opposed to being inline with the current DAG. + depth: Override the depth used to order the event in the DAG. + Should normally be set to None, which will cause the depth to be calculated + based on the prev_events. + Raises: ResourceLimitError if server is blocked to some resource being exceeded @@ -563,11 +574,36 @@ class EventCreationHandler: if txn_id is not None: builder.internal_metadata.txn_id = txn_id + builder.internal_metadata.outlier = outlier + + builder.internal_metadata.historical = historical + + # Strip down the auth_event_ids to only what we need to auth the event. + # For example, we don't need extra m.room.member that don't match event.sender + if auth_event_ids is not None: + temp_event = await builder.build( + prev_event_ids=prev_event_ids, + auth_event_ids=auth_event_ids, + depth=depth, + ) + auth_events = await self.store.get_events_as_list(auth_event_ids) + # Create a StateMap[str] + auth_event_state_map = { + (e.type, e.state_key): e.event_id for e in auth_events + } + # Actually strip down and use the necessary auth events + auth_event_ids = self.auth.compute_auth_events( + event=temp_event, + current_state_ids=auth_event_state_map, + for_verification=False, + ) + event, context = await self.create_new_client_event( builder=builder, requester=requester, prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids, + depth=depth, ) # In an ideal world we wouldn't need the second part of this condition. However, @@ -724,9 +760,13 @@ class EventCreationHandler: self, requester: Requester, event_dict: dict, + prev_event_ids: Optional[List[str]] = None, + auth_event_ids: Optional[List[str]] = None, ratelimit: bool = True, txn_id: Optional[str] = None, ignore_shadow_ban: bool = False, + outlier: bool = False, + depth: Optional[int] = None, ) -> Tuple[EventBase, int]: """ Creates an event, then sends it. @@ -736,10 +776,24 @@ class EventCreationHandler: Args: requester: The requester sending the event. event_dict: An entire event. + prev_event_ids: + The event IDs to use as the prev events. + Should normally be left as None to automatically request them + from the database. + auth_event_ids: + The event ids to use as the auth_events for the new event. + Should normally be left as None, which will cause them to be calculated + based on the room state at the prev_events. ratelimit: Whether to rate limit this send. txn_id: The transaction ID. ignore_shadow_ban: True if shadow-banned users should be allowed to send this event. + outlier: Indicates whether the event is an `outlier`, i.e. if + it's from an arbitrary point and floating in the DAG as + opposed to being inline with the current DAG. + depth: Override the depth used to order the event in the DAG. + Should normally be set to None, which will cause the depth to be calculated + based on the prev_events. Returns: The event, and its stream ordering (if deduplication happened, @@ -779,7 +833,13 @@ class EventCreationHandler: return event, event.internal_metadata.stream_ordering event, context = await self.create_event( - requester, event_dict, txn_id=txn_id + requester, + event_dict, + txn_id=txn_id, + prev_event_ids=prev_event_ids, + auth_event_ids=auth_event_ids, + outlier=outlier, + depth=depth, ) assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % ( @@ -811,6 +871,7 @@ class EventCreationHandler: requester: Optional[Requester] = None, prev_event_ids: Optional[List[str]] = None, auth_event_ids: Optional[List[str]] = None, + depth: Optional[int] = None, ) -> Tuple[EventBase, EventContext]: """Create a new event for a local client @@ -828,6 +889,10 @@ class EventCreationHandler: Should normally be left as None, which will cause them to be calculated based on the room state at the prev_events. + depth: Override the depth used to order the event in the DAG. + Should normally be set to None, which will cause the depth to be calculated + based on the prev_events. + Returns: Tuple of created event, context """ @@ -851,9 +916,24 @@ class EventCreationHandler: ), "Attempting to create an event with no prev_events" event = await builder.build( - prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids + prev_event_ids=prev_event_ids, + auth_event_ids=auth_event_ids, + depth=depth, ) - context = await self.state.compute_event_context(event) + + old_state = None + + # Pass on the outlier property from the builder to the event + # after it is created + if builder.internal_metadata.outlier: + event.internal_metadata.outlier = builder.internal_metadata.outlier + + # Calculate the state for outliers that pass in their own `auth_event_ids` + if auth_event_ids: + old_state = await self.store.get_events_as_list(auth_event_ids) + + context = await self.state.compute_event_context(event, old_state=old_state) + if requester: context.app_service = requester.app_service @@ -1018,7 +1098,13 @@ class EventCreationHandler: the arguments. """ - await self.action_generator.handle_push_actions_for_event(event, context) + # Skip push notification actions for historical messages + # because we don't want to notify people about old history back in time. + # The historical messages also do not have the proper `context.current_state_ids` + # and `state_groups` because they have `prev_events` that aren't persisted yet + # (historical messages persisted in reverse-chronological order). + if not event.internal_metadata.is_historical(): + await self.action_generator.handle_push_actions_for_event(event, context) try: # If we're a worker we need to hit out to the master. @@ -1317,13 +1403,21 @@ class EventCreationHandler: if prev_state_ids: raise AuthError(403, "Changing the room create event is forbidden") + # Mark any `m.historical` messages as backfilled so they don't appear + # in `/sync` and have the proper decrementing `stream_ordering` as we import + backfilled = False + if event.internal_metadata.is_historical(): + backfilled = True + # Note that this returns the event that was persisted, which may not be # the same as we passed in if it was deduplicated due transaction IDs. ( event, event_pos, max_stream_token, - ) = await self.storage.persistence.persist_event(event, context=context) + ) = await self.storage.persistence.persist_event( + event, context=context, backfilled=backfilled + ) if self._ephemeral_events_enabled: # If there's an expiry timestamp on the event, schedule its expiry. diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index a49a61a34c..1192591609 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -257,11 +257,42 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): room_id: str, membership: str, prev_event_ids: List[str], + auth_event_ids: Optional[List[str]] = None, txn_id: Optional[str] = None, ratelimit: bool = True, content: Optional[dict] = None, require_consent: bool = True, + outlier: bool = False, ) -> Tuple[str, int]: + """ + Internal membership update function to get an existing event or create + and persist a new event for the new membership change. + + Args: + requester: + target: + room_id: + membership: + prev_event_ids: The event IDs to use as the prev events + + auth_event_ids: + The event ids to use as the auth_events for the new event. + Should normally be left as None, which will cause them to be calculated + based on the room state at the prev_events. + + txn_id: + ratelimit: + content: + require_consent: + + outlier: Indicates whether the event is an `outlier`, i.e. if + it's from an arbitrary point and floating in the DAG as + opposed to being inline with the current DAG. + + Returns: + Tuple of event ID and stream ordering position + """ + user_id = target.to_string() if content is None: @@ -298,7 +329,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): }, txn_id=txn_id, prev_event_ids=prev_event_ids, + auth_event_ids=auth_event_ids, require_consent=require_consent, + outlier=outlier, ) prev_state_ids = await context.get_prev_state_ids() @@ -399,6 +432,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): ratelimit: bool = True, content: Optional[dict] = None, require_consent: bool = True, + outlier: bool = False, + prev_event_ids: Optional[List[str]] = None, + auth_event_ids: Optional[List[str]] = None, ) -> Tuple[str, int]: """Update a user's membership in a room. @@ -413,6 +449,14 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): ratelimit: Whether to rate limit the request. content: The content of the created event. require_consent: Whether consent is required. + outlier: Indicates whether the event is an `outlier`, i.e. if + it's from an arbitrary point and floating in the DAG as + opposed to being inline with the current DAG. + prev_event_ids: The event IDs to use as the prev events + auth_event_ids: + The event ids to use as the auth_events for the new event. + Should normally be left as None, which will cause them to be calculated + based on the room state at the prev_events. Returns: A tuple of the new event ID and stream ID. @@ -439,6 +483,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): ratelimit=ratelimit, content=content, require_consent=require_consent, + outlier=outlier, + prev_event_ids=prev_event_ids, + auth_event_ids=auth_event_ids, ) return result @@ -455,10 +502,36 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): ratelimit: bool = True, content: Optional[dict] = None, require_consent: bool = True, + outlier: bool = False, + prev_event_ids: Optional[List[str]] = None, + auth_event_ids: Optional[List[str]] = None, ) -> Tuple[str, int]: """Helper for update_membership. Assumes that the membership linearizer is already held for the room. + + Args: + requester: + target: + room_id: + action: + txn_id: + remote_room_hosts: + third_party_signed: + ratelimit: + content: + require_consent: + outlier: Indicates whether the event is an `outlier`, i.e. if + it's from an arbitrary point and floating in the DAG as + opposed to being inline with the current DAG. + prev_event_ids: The event IDs to use as the prev events + auth_event_ids: + The event ids to use as the auth_events for the new event. + Should normally be left as None, which will cause them to be calculated + based on the room state at the prev_events. + + Returns: + A tuple of the new event ID and stream ID. """ content_specified = bool(content) if content is None: @@ -543,6 +616,21 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): if block_invite: raise SynapseError(403, "Invites have been disabled on this server") + if prev_event_ids: + return await self._local_membership_update( + requester=requester, + target=target, + room_id=room_id, + membership=effective_membership_state, + txn_id=txn_id, + ratelimit=ratelimit, + prev_event_ids=prev_event_ids, + auth_event_ids=auth_event_ids, + content=content, + require_consent=require_consent, + outlier=outlier, + ) + latest_event_ids = await self.store.get_prev_events_for_room(room_id) current_state_ids = await self.state_handler.get_current_state_ids( @@ -732,8 +820,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): txn_id=txn_id, ratelimit=ratelimit, prev_event_ids=latest_event_ids, + auth_event_ids=auth_event_ids, content=content, require_consent=require_consent, + outlier=outlier, ) async def transfer_room_state_on_room_upgrade( -- cgit 1.4.1