diff options
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r-- | synapse/handlers/sync.py | 285 |
1 files changed, 87 insertions, 198 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index f3039c3c3f..891435c14d 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -334,19 +334,6 @@ class SyncHandler: full_state: bool, cache_context: ResponseCacheContext[SyncRequestKey], ) -> SyncResult: - """The start of the machinery that produces a /sync response. - - See https://spec.matrix.org/v1.1/client-server-api/#syncing for full details. - - This method does high-level bookkeeping: - - tracking the kind of sync in the logging context - - deleting any to_device messages whose delivery has been acknowledged. - - deciding if we should dispatch an instant or delayed response - - marking the sync as being lazily loaded, if appropriate - - Computing the body of the response begins in the next method, - `current_sync_for_user`. - """ if since_token is None: sync_type = "initial_sync" elif full_state: @@ -376,7 +363,7 @@ class SyncHandler: sync_config, since_token, full_state=full_state ) else: - # Otherwise, we wait for something to happen and report it to the user. + async def current_sync_callback( before_token: StreamToken, after_token: StreamToken ) -> SyncResult: @@ -415,12 +402,7 @@ class SyncHandler: since_token: Optional[StreamToken] = None, full_state: bool = False, ) -> SyncResult: - """Generates the response body of a sync result, represented as a SyncResult. - - This is a wrapper around `generate_sync_result` which starts an open tracing - span to track the sync. See `generate_sync_result` for the next part of your - indoctrination. - """ + """Get the sync for client needed to match what the server has now.""" with start_active_span("current_sync_for_user"): log_kv({"since_token": since_token}) sync_result = await self.generate_sync_result( @@ -578,7 +560,7 @@ class SyncHandler: # that have happened since `since_key` up to `end_key`, so we # can just use `get_room_events_stream_for_room`. # Otherwise, we want to return the last N events in the room - # in topological ordering. + # in toplogical ordering. if since_key: events, end_key = await self.store.get_room_events_stream_for_room( room_id, @@ -1060,18 +1042,7 @@ class SyncHandler: since_token: Optional[StreamToken] = None, full_state: bool = False, ) -> SyncResult: - """Generates the response body of a sync result. - - This is represented by a `SyncResult` struct, which is built from small pieces - using a `SyncResultBuilder`. See also - https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3sync - the `sync_result_builder` is passed as a mutable ("inout") parameter to various - helper functions. These retrieve and process the data which forms the sync body, - often writing to the `sync_result_builder` to store their output. - - At the end, we transfer data from the `sync_result_builder` to a new `SyncResult` - instance to signify that the sync calculation is complete. - """ + """Generates a sync result.""" # NB: The now_token gets changed by some of the generate_sync_* methods, # this is due to some of the underlying streams not supporting the ability # to query up to a given point. @@ -1373,22 +1344,14 @@ class SyncHandler: async def _generate_sync_entry_for_account_data( self, sync_result_builder: "SyncResultBuilder" ) -> Dict[str, Dict[str, JsonDict]]: - """Generates the account data portion of the sync response. - - Account data (called "Client Config" in the spec) can be set either globally - or for a specific room. Account data consists of a list of events which - accumulate state, much like a room. - - This function retrieves global and per-room account data. The former is written - to the given `sync_result_builder`. The latter is returned directly, to be - later written to the `sync_result_builder` on a room-by-room basis. + """Generates the account data portion of the sync response. Populates + `sync_result_builder` with the result. Args: sync_result_builder Returns: - A dictionary whose keys (room ids) map to the per room account data for that - room. + A dictionary containing the per room account data. """ sync_config = sync_result_builder.sync_config user_id = sync_result_builder.sync_config.user.to_string() @@ -1396,7 +1359,7 @@ class SyncHandler: if since_token and not sync_result_builder.full_state: ( - global_account_data, + account_data, account_data_by_room, ) = await self.store.get_updated_account_data_for_user( user_id, since_token.account_data_key @@ -1407,23 +1370,23 @@ class SyncHandler: ) if push_rules_changed: - global_account_data["m.push_rules"] = await self.push_rules_for_user( + account_data["m.push_rules"] = await self.push_rules_for_user( sync_config.user ) else: ( - global_account_data, + account_data, account_data_by_room, ) = await self.store.get_account_data_for_user(sync_config.user.to_string()) - global_account_data["m.push_rules"] = await self.push_rules_for_user( + account_data["m.push_rules"] = await self.push_rules_for_user( sync_config.user ) account_data_for_user = await sync_config.filter_collection.filter_account_data( [ {"type": account_data_type, "content": content} - for account_data_type, content in global_account_data.items() + for account_data_type, content in account_data.items() ] ) @@ -1497,31 +1460,18 @@ class SyncHandler: """Generates the rooms portion of the sync response. Populates the `sync_result_builder` with the result. - In the response that reaches the client, rooms are divided into four categories: - `invite`, `join`, `knock`, `leave`. These aren't the same as the four sets of - room ids returned by this function. - Args: sync_result_builder account_data_by_room: Dictionary of per room account data Returns: - Returns a 4-tuple describing rooms the user has joined or left, and users who've - joined or left rooms any rooms the user is in. This gets used later in - `_generate_sync_entry_for_device_list`. - - Its entries are: - - newly_joined_rooms - - newly_joined_or_invited_or_knocked_users - - newly_left_rooms - - newly_left_users + Returns a 4-tuple of + `(newly_joined_rooms, newly_joined_or_invited_users, + newly_left_rooms, newly_left_users)` """ - since_token = sync_result_builder.since_token - - # 1. Start by fetching all ephemeral events in rooms we've joined (if required). user_id = sync_result_builder.sync_config.user.to_string() block_all_room_ephemeral = ( - since_token is None + sync_result_builder.since_token is None and sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral() ) @@ -1535,8 +1485,9 @@ class SyncHandler: ) sync_result_builder.now_token = now_token - # 2. We check up front if anything has changed, if it hasn't then there is + # We check up front if anything has changed, if it hasn't then there is # no point in going further. + since_token = sync_result_builder.since_token if not sync_result_builder.full_state: if since_token and not ephemeral_by_room and not account_data_by_room: have_changed = await self._have_rooms_changed(sync_result_builder) @@ -1549,8 +1500,20 @@ class SyncHandler: logger.debug("no-oping sync") return set(), set(), set(), set() - # 3. Work out which rooms need reporting in the sync response. - ignored_users = await self._get_ignored_users(user_id) + ignored_account_data = ( + await self.store.get_global_account_data_by_type_for_user( + AccountDataTypes.IGNORED_USER_LIST, user_id=user_id + ) + ) + + # If there is ignored users account data and it matches the proper type, + # then use it. + ignored_users: FrozenSet[str] = frozenset() + if ignored_account_data: + ignored_users_data = ignored_account_data.get("ignored_users", {}) + if isinstance(ignored_users_data, dict): + ignored_users = frozenset(ignored_users_data.keys()) + if since_token: room_changes = await self._get_rooms_changed( sync_result_builder, ignored_users @@ -1560,6 +1523,7 @@ class SyncHandler: ) else: room_changes = await self._get_all_rooms(sync_result_builder, ignored_users) + tags_by_room = await self.store.get_tags_for_user(user_id) log_kv({"rooms_changed": len(room_changes.room_entries)}) @@ -1570,8 +1534,6 @@ class SyncHandler: newly_joined_rooms = room_changes.newly_joined_rooms newly_left_rooms = room_changes.newly_left_rooms - # 4. We need to apply further processing to `room_entries` (rooms considered - # joined or archived). async def handle_room_entries(room_entry: "RoomSyncResultBuilder") -> None: logger.debug("Generating room entry for %s", room_entry.room_id) await self._generate_room_entry( @@ -1590,13 +1552,31 @@ class SyncHandler: sync_result_builder.invited.extend(invited) sync_result_builder.knocked.extend(knocked) - # 5. Work out which users have joined or left rooms we're in. We use this - # to build the device_list part of the sync response in - # `_generate_sync_entry_for_device_list`. - ( - newly_joined_or_invited_or_knocked_users, - newly_left_users, - ) = sync_result_builder.calculate_user_changes() + # Now we want to get any newly joined, invited or knocking users + newly_joined_or_invited_or_knocked_users = set() + newly_left_users = set() + if since_token: + for joined_sync in sync_result_builder.joined: + it = itertools.chain( + joined_sync.timeline.events, joined_sync.state.values() + ) + for event in it: + if event.type == EventTypes.Member: + if ( + event.membership == Membership.JOIN + or event.membership == Membership.INVITE + or event.membership == Membership.KNOCK + ): + newly_joined_or_invited_or_knocked_users.add( + event.state_key + ) + else: + prev_content = event.unsigned.get("prev_content", {}) + prev_membership = prev_content.get("membership", None) + if prev_membership == Membership.JOIN: + newly_left_users.add(event.state_key) + + newly_left_users -= newly_joined_or_invited_or_knocked_users return ( set(newly_joined_rooms), @@ -1605,36 +1585,11 @@ class SyncHandler: newly_left_users, ) - async def _get_ignored_users(self, user_id: str) -> FrozenSet[str]: - """Retrieve the users ignored by the given user from their global account_data. - - Returns an empty set if - - there is no global account_data entry for ignored_users - - there is such an entry, but it's not a JSON object. - """ - # TODO: Can we `SELECT ignored_user_id FROM ignored_users WHERE ignorer_user_id=?;` instead? - ignored_account_data = ( - await self.store.get_global_account_data_by_type_for_user( - AccountDataTypes.IGNORED_USER_LIST, user_id=user_id - ) - ) - - # If there is ignored users account data and it matches the proper type, - # then use it. - ignored_users: FrozenSet[str] = frozenset() - if ignored_account_data: - ignored_users_data = ignored_account_data.get("ignored_users", {}) - if isinstance(ignored_users_data, dict): - ignored_users = frozenset(ignored_users_data.keys()) - return ignored_users - async def _have_rooms_changed( self, sync_result_builder: "SyncResultBuilder" ) -> bool: """Returns whether there may be any new events that should be sent down the sync. Returns True if there are. - - Does not modify the `sync_result_builder`. """ user_id = sync_result_builder.sync_config.user.to_string() since_token = sync_result_builder.since_token @@ -1642,13 +1597,12 @@ class SyncHandler: assert since_token - # Get a list of membership change events that have happened to the user - # requesting the sync. - membership_changes = await self.store.get_membership_changes_for_user( + # Get a list of membership change events that have happened. + rooms_changed = await self.store.get_membership_changes_for_user( user_id, since_token.room_key, now_token.room_key ) - if membership_changes: + if rooms_changed: return True stream_id = since_token.room_key.stream @@ -1660,25 +1614,7 @@ class SyncHandler: async def _get_rooms_changed( self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str] ) -> _RoomChanges: - """Determine the changes in rooms to report to the user. - - Ideally, we want to report all events whose stream ordering `s` lies in the - range `since_token < s <= now_token`, where the two tokens are read from the - sync_result_builder. - - If there are too many events in that range to report, things get complicated. - In this situation we return a truncated list of the most recent events, and - indicate in the response that there is a "gap" of omitted events. Additionally: - - - we include a "state_delta", to describe the changes in state over the gap, - - we include all membership events applying to the user making the request, - even those in the gap. - - See the spec for the rationale: - https://spec.matrix.org/v1.1/client-server-api/#syncing - - The sync_result_builder is not modified by this function. - """ + """Gets the the changes that have happened since the last sync.""" user_id = sync_result_builder.sync_config.user.to_string() since_token = sync_result_builder.since_token now_token = sync_result_builder.now_token @@ -1686,36 +1622,21 @@ class SyncHandler: assert since_token - # The spec - # https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3sync - # notes that membership events need special consideration: - # - # > When a sync is limited, the server MUST return membership events for events - # > in the gap (between since and the start of the returned timeline), regardless - # > as to whether or not they are redundant. - # - # We fetch such events here, but we only seem to use them for categorising rooms - # as newly joined, newly left, invited or knocked. - # TODO: we've already called this function and ran this query in - # _have_rooms_changed. We could keep the results in memory to avoid a - # second query, at the cost of more complicated source code. - membership_change_events = await self.store.get_membership_changes_for_user( + # Get a list of membership change events that have happened. + rooms_changed = await self.store.get_membership_changes_for_user( user_id, since_token.room_key, now_token.room_key ) mem_change_events_by_room_id: Dict[str, List[EventBase]] = {} - for event in membership_change_events: + for event in rooms_changed: mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) - newly_joined_rooms: List[str] = [] - newly_left_rooms: List[str] = [] - room_entries: List[RoomSyncResultBuilder] = [] - invited: List[InvitedSyncResult] = [] - knocked: List[KnockedSyncResult] = [] + newly_joined_rooms = [] + newly_left_rooms = [] + room_entries = [] + invited = [] + knocked = [] for room_id, events in mem_change_events_by_room_id.items(): - # The body of this loop will add this room to at least one of the five lists - # above. Things get messy if you've e.g. joined, left, joined then left the - # room all in the same sync period. logger.debug( "Membership changes in %s: [%s]", room_id, @@ -1770,7 +1691,6 @@ class SyncHandler: if not non_joins: continue - last_non_join = non_joins[-1] # Check if we have left the room. This can either be because we were # joined before *or* that we since joined and then left. @@ -1792,18 +1712,18 @@ class SyncHandler: newly_left_rooms.append(room_id) # Only bother if we're still currently invited - should_invite = last_non_join.membership == Membership.INVITE + should_invite = non_joins[-1].membership == Membership.INVITE if should_invite: - if last_non_join.sender not in ignored_users: - invite_room_sync = InvitedSyncResult(room_id, invite=last_non_join) + if event.sender not in ignored_users: + invite_room_sync = InvitedSyncResult(room_id, invite=non_joins[-1]) if invite_room_sync: invited.append(invite_room_sync) # Only bother if our latest membership in the room is knock (and we haven't # been accepted/rejected in the meantime). - should_knock = last_non_join.membership == Membership.KNOCK + should_knock = non_joins[-1].membership == Membership.KNOCK if should_knock: - knock_room_sync = KnockedSyncResult(room_id, knock=last_non_join) + knock_room_sync = KnockedSyncResult(room_id, knock=non_joins[-1]) if knock_room_sync: knocked.append(knock_room_sync) @@ -1861,9 +1781,7 @@ class SyncHandler: timeline_limit = sync_config.filter_collection.timeline_limit() - # Get all events since the `from_key` in rooms we're currently joined to. - # If there are too many, we get the most recent events only. This leaves - # a "gap" in the timeline, as described by the spec for /sync. + # Get all events for rooms we're currently joined to. room_to_events = await self.store.get_room_events_stream_for_rooms( room_ids=sync_result_builder.joined_room_ids, from_key=since_token.room_key, @@ -1924,10 +1842,6 @@ class SyncHandler: ) -> _RoomChanges: """Returns entries for all rooms for the user. - Like `_get_rooms_changed`, but assumes the `since_token` is `None`. - - This function does not modify the sync_result_builder. - Args: sync_result_builder ignored_users: Set of users ignored by user. @@ -1939,9 +1853,16 @@ class SyncHandler: now_token = sync_result_builder.now_token sync_config = sync_result_builder.sync_config + membership_list = ( + Membership.INVITE, + Membership.KNOCK, + Membership.JOIN, + Membership.LEAVE, + Membership.BAN, + ) + room_list = await self.store.get_rooms_for_local_user_where_membership_is( - user_id=user_id, - membership_list=Membership.LIST, + user_id=user_id, membership_list=membership_list ) room_entries = [] @@ -2291,7 +2212,8 @@ def _calculate_state( # to only include membership events for the senders in the timeline. # In practice, we can do this by removing them from the p_ids list, # which is the list of relevant state we know we have already sent to the client. - # see https://github.com/matrix-org/synapse/pull/2970/files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809 + # see https://github.com/matrix-org/synapse/pull/2970 + # /files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809 if lazy_load_members: p_ids.difference_update( @@ -2340,39 +2262,6 @@ class SyncResultBuilder: groups: Optional[GroupsSyncResult] = None to_device: List[JsonDict] = attr.Factory(list) - def calculate_user_changes(self) -> Tuple[Set[str], Set[str]]: - """Work out which other users have joined or left rooms we are joined to. - - This data only is only useful for an incremental sync. - - The SyncResultBuilder is not modified by this function. - """ - newly_joined_or_invited_or_knocked_users = set() - newly_left_users = set() - if self.since_token: - for joined_sync in self.joined: - it = itertools.chain( - joined_sync.timeline.events, joined_sync.state.values() - ) - for event in it: - if event.type == EventTypes.Member: - if ( - event.membership == Membership.JOIN - or event.membership == Membership.INVITE - or event.membership == Membership.KNOCK - ): - newly_joined_or_invited_or_knocked_users.add( - event.state_key - ) - else: - prev_content = event.unsigned.get("prev_content", {}) - prev_membership = prev_content.get("membership", None) - if prev_membership == Membership.JOIN: - newly_left_users.add(event.state_key) - - newly_left_users -= newly_joined_or_invited_or_knocked_users - return newly_joined_or_invited_or_knocked_users, newly_left_users - @attr.s(slots=True, auto_attribs=True) class RoomSyncResultBuilder: |