diff options
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r-- | synapse/handlers/sync.py | 254 |
1 files changed, 155 insertions, 99 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 86c3c7f0df..edfdb99cbd 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -505,10 +505,13 @@ class SyncHandler: else: limited = False + log_kv({"limited": limited}) + if potential_recents: recents = sync_config.filter_collection.filter_room_timeline( potential_recents ) + log_kv({"recents_after_sync_filtering": len(recents)}) # We check if there are any state events, if there are then we pass # all current state events to the filter_events function. This is to @@ -526,6 +529,7 @@ class SyncHandler: recents, always_include_ids=current_state_ids, ) + log_kv({"recents_after_visibility_filtering": len(recents)}) else: recents = [] @@ -566,10 +570,15 @@ class SyncHandler: events, end_key = await self.store.get_recent_events_for_room( room_id, limit=load_limit + 1, end_token=end_key ) + + log_kv({"loaded_recents": len(events)}) + loaded_recents = sync_config.filter_collection.filter_room_timeline( events ) + log_kv({"loaded_recents_after_sync_filtering": len(loaded_recents)}) + # We check if there are any state events, if there are then we pass # all current state events to the filter_events function. This is to # ensure that we always include current state in the timeline @@ -586,6 +595,9 @@ class SyncHandler: loaded_recents, always_include_ids=current_state_ids, ) + + log_kv({"loaded_recents_after_client_filtering": len(loaded_recents)}) + loaded_recents.extend(recents) recents = loaded_recents @@ -1116,6 +1128,8 @@ class SyncHandler: logger.debug("Fetching group data") await self._generate_sync_entry_for_groups(sync_result_builder) + num_events = 0 + # debug for https://github.com/matrix-org/synapse/issues/4422 for joined_room in sync_result_builder.joined: room_id = joined_room.room_id @@ -1123,6 +1137,14 @@ class SyncHandler: issue4422_logger.debug( "Sync result for newly joined room %s: %r", room_id, joined_room ) + num_events += len(joined_room.timeline.events) + + log_kv( + { + "joined_rooms_in_result": len(sync_result_builder.joined), + "events_in_result": num_events, + } + ) logger.debug("Sync response calculation complete") return SyncResult( @@ -1467,6 +1489,7 @@ class SyncHandler: if not sync_result_builder.full_state: if since_token and not ephemeral_by_room and not account_data_by_room: have_changed = await self._have_rooms_changed(sync_result_builder) + log_kv({"rooms_have_changed": have_changed}) if not have_changed: tags_by_room = await self.store.get_updated_tags( user_id, since_token.account_data_key @@ -1501,13 +1524,15 @@ class SyncHandler: tags_by_room = await self.store.get_tags_for_user(user_id) + log_kv({"rooms_changed": len(room_changes.room_entries)}) + room_entries = room_changes.room_entries invited = room_changes.invited knocked = room_changes.knocked newly_joined_rooms = room_changes.newly_joined_rooms newly_left_rooms = room_changes.newly_left_rooms - async def handle_room_entries(room_entry): + async def handle_room_entries(room_entry: "RoomSyncResultBuilder"): logger.debug("Generating room entry for %s", room_entry.room_id) res = await self._generate_room_entry( sync_result_builder, @@ -1933,125 +1958,156 @@ class SyncHandler: since_token = room_builder.since_token upto_token = room_builder.upto_token - batch = await self._load_filtered_recents( - room_id, - sync_config, - now_token=upto_token, - since_token=since_token, - potential_recents=events, - newly_joined_room=newly_joined, - ) + with start_active_span("generate_room_entry"): + set_tag("room_id", room_id) + log_kv({"events": len(events or ())}) - # Note: `batch` can be both empty and limited here in the case where - # `_load_filtered_recents` can't find any events the user should see - # (e.g. due to having ignored the sender of the last 50 events). + log_kv( + { + "since_token": since_token, + "upto_token": upto_token, + } + ) - if newly_joined: - # debug for https://github.com/matrix-org/synapse/issues/4422 - issue4422_logger.debug( - "Timeline events after filtering in newly-joined room %s: %r", + batch = await self._load_filtered_recents( room_id, - batch, + sync_config, + now_token=upto_token, + since_token=since_token, + potential_recents=events, + newly_joined_room=newly_joined, + ) + log_kv( + { + "batch_events": len(batch.events), + "prev_batch": batch.prev_batch, + "batch_limited": batch.limited, + } ) - # When we join the room (or the client requests full_state), we should - # send down any existing tags. Usually the user won't have tags in a - # newly joined room, unless either a) they've joined before or b) the - # tag was added by synapse e.g. for server notice rooms. - if full_state: - user_id = sync_result_builder.sync_config.user.to_string() - tags = await self.store.get_tags_for_room(user_id, room_id) + # Note: `batch` can be both empty and limited here in the case where + # `_load_filtered_recents` can't find any events the user should see + # (e.g. due to having ignored the sender of the last 50 events). - # If there aren't any tags, don't send the empty tags list down - # sync - if not tags: - tags = None + if newly_joined: + # debug for https://github.com/matrix-org/synapse/issues/4422 + issue4422_logger.debug( + "Timeline events after filtering in newly-joined room %s: %r", + room_id, + batch, + ) - account_data_events = [] - if tags is not None: - account_data_events.append({"type": "m.tag", "content": {"tags": tags}}) + # When we join the room (or the client requests full_state), we should + # send down any existing tags. Usually the user won't have tags in a + # newly joined room, unless either a) they've joined before or b) the + # tag was added by synapse e.g. for server notice rooms. + if full_state: + user_id = sync_result_builder.sync_config.user.to_string() + tags = await self.store.get_tags_for_room(user_id, room_id) - for account_data_type, content in account_data.items(): - account_data_events.append({"type": account_data_type, "content": content}) + # If there aren't any tags, don't send the empty tags list down + # sync + if not tags: + tags = None - account_data_events = sync_config.filter_collection.filter_room_account_data( - account_data_events - ) + account_data_events = [] + if tags is not None: + account_data_events.append({"type": "m.tag", "content": {"tags": tags}}) - ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral) + for account_data_type, content in account_data.items(): + account_data_events.append( + {"type": account_data_type, "content": content} + ) - if not ( - always_include or batch or account_data_events or ephemeral or full_state - ): - return + account_data_events = ( + sync_config.filter_collection.filter_room_account_data( + account_data_events + ) + ) - state = await self.compute_state_delta( - room_id, batch, sync_config, since_token, now_token, full_state=full_state - ) + ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral) - summary: Optional[JsonDict] = {} - - # we include a summary in room responses when we're lazy loading - # members (as the client otherwise doesn't have enough info to form - # the name itself). - if sync_config.filter_collection.lazy_load_members() and ( - # we recalculate the summary: - # if there are membership changes in the timeline, or - # if membership has changed during a gappy sync, or - # if this is an initial sync. - any(ev.type == EventTypes.Member for ev in batch.events) - or ( - # XXX: this may include false positives in the form of LL - # members which have snuck into state - batch.limited - and any(t == EventTypes.Member for (t, k) in state) - ) - or since_token is None - ): - summary = await self.compute_summary( - room_id, sync_config, batch, state, now_token - ) + if not ( + always_include + or batch + or account_data_events + or ephemeral + or full_state + ): + return - if room_builder.rtype == "joined": - unread_notifications: Dict[str, int] = {} - room_sync = JoinedSyncResult( - room_id=room_id, - timeline=batch, - state=state, - ephemeral=ephemeral, - account_data=account_data_events, - unread_notifications=unread_notifications, - summary=summary, - unread_count=0, + state = await self.compute_state_delta( + room_id, + batch, + sync_config, + since_token, + now_token, + full_state=full_state, ) - if room_sync or always_include: - notifs = await self.unread_notifs_for_room_id(room_id, sync_config) + summary: Optional[JsonDict] = {} + + # we include a summary in room responses when we're lazy loading + # members (as the client otherwise doesn't have enough info to form + # the name itself). + if sync_config.filter_collection.lazy_load_members() and ( + # we recalculate the summary: + # if there are membership changes in the timeline, or + # if membership has changed during a gappy sync, or + # if this is an initial sync. + any(ev.type == EventTypes.Member for ev in batch.events) + or ( + # XXX: this may include false positives in the form of LL + # members which have snuck into state + batch.limited + and any(t == EventTypes.Member for (t, k) in state) + ) + or since_token is None + ): + summary = await self.compute_summary( + room_id, sync_config, batch, state, now_token + ) + + if room_builder.rtype == "joined": + unread_notifications: Dict[str, int] = {} + room_sync = JoinedSyncResult( + room_id=room_id, + timeline=batch, + state=state, + ephemeral=ephemeral, + account_data=account_data_events, + unread_notifications=unread_notifications, + summary=summary, + unread_count=0, + ) - unread_notifications["notification_count"] = notifs["notify_count"] - unread_notifications["highlight_count"] = notifs["highlight_count"] + if room_sync or always_include: + notifs = await self.unread_notifs_for_room_id(room_id, sync_config) - room_sync.unread_count = notifs["unread_count"] + unread_notifications["notification_count"] = notifs["notify_count"] + unread_notifications["highlight_count"] = notifs["highlight_count"] - sync_result_builder.joined.append(room_sync) + room_sync.unread_count = notifs["unread_count"] - if batch.limited and since_token: - user_id = sync_result_builder.sync_config.user.to_string() - logger.debug( - "Incremental gappy sync of %s for user %s with %d state events" - % (room_id, user_id, len(state)) + sync_result_builder.joined.append(room_sync) + + if batch.limited and since_token: + user_id = sync_result_builder.sync_config.user.to_string() + logger.debug( + "Incremental gappy sync of %s for user %s with %d state events" + % (room_id, user_id, len(state)) + ) + elif room_builder.rtype == "archived": + archived_room_sync = ArchivedSyncResult( + room_id=room_id, + timeline=batch, + state=state, + account_data=account_data_events, ) - elif room_builder.rtype == "archived": - archived_room_sync = ArchivedSyncResult( - room_id=room_id, - timeline=batch, - state=state, - account_data=account_data_events, - ) - if archived_room_sync or always_include: - sync_result_builder.archived.append(archived_room_sync) - else: - raise Exception("Unrecognized rtype: %r", room_builder.rtype) + if archived_room_sync or always_include: + sync_result_builder.archived.append(archived_room_sync) + else: + raise Exception("Unrecognized rtype: %r", room_builder.rtype) async def get_rooms_for_user_at( self, user_id: str, room_key: RoomStreamToken |