diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/__init__.py | 2 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 267 |
2 files changed, 142 insertions, 127 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py index d62ccd1dbc..c9ef90ccaa 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -47,7 +47,7 @@ try: except ImportError: pass -__version__ = "1.43.0rc1" +__version__ = "1.43.0rc2" if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): # We import here so that we don't have to install a bunch of deps when diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 91d24534eb..7523d8e839 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1533,21 +1533,18 @@ class SyncHandler: newly_left_rooms = room_changes.newly_left_rooms async def handle_room_entries(room_entry: "RoomSyncResultBuilder"): - with start_active_span("generate_room_entry"): - set_tag("room_id", room_entry.room_id) - log_kv({"events": len(room_entry.events or [])}) - logger.debug("Generating room entry for %s", room_entry.room_id) - res = await self._generate_room_entry( - sync_result_builder, - ignored_users, - room_entry, - ephemeral=ephemeral_by_room.get(room_entry.room_id, []), - tags=tags_by_room.get(room_entry.room_id), - account_data=account_data_by_room.get(room_entry.room_id, {}), - always_include=sync_result_builder.full_state, - ) - logger.debug("Generated room entry for %s", room_entry.room_id) - return res + logger.debug("Generating room entry for %s", room_entry.room_id) + res = await self._generate_room_entry( + sync_result_builder, + ignored_users, + room_entry, + ephemeral=ephemeral_by_room.get(room_entry.room_id, []), + tags=tags_by_room.get(room_entry.room_id), + account_data=account_data_by_room.get(room_entry.room_id, {}), + always_include=sync_result_builder.full_state, + ) + logger.debug("Generated room entry for %s", room_entry.room_id) + return res await concurrently_execute(handle_room_entries, room_entries, 10) @@ -1960,139 +1957,157 @@ class SyncHandler: room_id = room_builder.room_id since_token = room_builder.since_token upto_token = room_builder.upto_token - log_kv( - { - "since_token": since_token, - "upto_token": upto_token, - } - ) - batch = await self._load_filtered_recents( - room_id, - sync_config, - now_token=upto_token, - since_token=since_token, - potential_recents=events, - newly_joined_room=newly_joined, - ) - log_kv( - { - "batch_events": len(batch.events), - "prev_batch": batch.prev_batch, - "batch_limited": batch.limited, - } - ) + with start_active_span("generate_room_entry"): + set_tag("room_id", room_id) + log_kv({"events": len(events or ())}) - # Note: `batch` can be both empty and limited here in the case where - # `_load_filtered_recents` can't find any events the user should see - # (e.g. due to having ignored the sender of the last 50 events). + log_kv( + { + "since_token": since_token, + "upto_token": upto_token, + } + ) - if newly_joined: - # debug for https://github.com/matrix-org/synapse/issues/4422 - issue4422_logger.debug( - "Timeline events after filtering in newly-joined room %s: %r", + batch = await self._load_filtered_recents( room_id, - batch, + sync_config, + now_token=upto_token, + since_token=since_token, + potential_recents=events, + newly_joined_room=newly_joined, + ) + log_kv( + { + "batch_events": len(batch.events), + "prev_batch": batch.prev_batch, + "batch_limited": batch.limited, + } ) - # When we join the room (or the client requests full_state), we should - # send down any existing tags. Usually the user won't have tags in a - # newly joined room, unless either a) they've joined before or b) the - # tag was added by synapse e.g. for server notice rooms. - if full_state: - user_id = sync_result_builder.sync_config.user.to_string() - tags = await self.store.get_tags_for_room(user_id, room_id) + # Note: `batch` can be both empty and limited here in the case where + # `_load_filtered_recents` can't find any events the user should see + # (e.g. due to having ignored the sender of the last 50 events). - # If there aren't any tags, don't send the empty tags list down - # sync - if not tags: - tags = None + if newly_joined: + # debug for https://github.com/matrix-org/synapse/issues/4422 + issue4422_logger.debug( + "Timeline events after filtering in newly-joined room %s: %r", + room_id, + batch, + ) - account_data_events = [] - if tags is not None: - account_data_events.append({"type": "m.tag", "content": {"tags": tags}}) + # When we join the room (or the client requests full_state), we should + # send down any existing tags. Usually the user won't have tags in a + # newly joined room, unless either a) they've joined before or b) the + # tag was added by synapse e.g. for server notice rooms. + if full_state: + user_id = sync_result_builder.sync_config.user.to_string() + tags = await self.store.get_tags_for_room(user_id, room_id) - for account_data_type, content in account_data.items(): - account_data_events.append({"type": account_data_type, "content": content}) + # If there aren't any tags, don't send the empty tags list down + # sync + if not tags: + tags = None - account_data_events = sync_config.filter_collection.filter_room_account_data( - account_data_events - ) + account_data_events = [] + if tags is not None: + account_data_events.append({"type": "m.tag", "content": {"tags": tags}}) - ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral) + for account_data_type, content in account_data.items(): + account_data_events.append( + {"type": account_data_type, "content": content} + ) - if not ( - always_include or batch or account_data_events or ephemeral or full_state - ): - return + account_data_events = ( + sync_config.filter_collection.filter_room_account_data( + account_data_events + ) + ) - state = await self.compute_state_delta( - room_id, batch, sync_config, since_token, now_token, full_state=full_state - ) + ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral) - summary: Optional[JsonDict] = {} - - # we include a summary in room responses when we're lazy loading - # members (as the client otherwise doesn't have enough info to form - # the name itself). - if sync_config.filter_collection.lazy_load_members() and ( - # we recalculate the summary: - # if there are membership changes in the timeline, or - # if membership has changed during a gappy sync, or - # if this is an initial sync. - any(ev.type == EventTypes.Member for ev in batch.events) - or ( - # XXX: this may include false positives in the form of LL - # members which have snuck into state - batch.limited - and any(t == EventTypes.Member for (t, k) in state) - ) - or since_token is None - ): - summary = await self.compute_summary( - room_id, sync_config, batch, state, now_token - ) + if not ( + always_include + or batch + or account_data_events + or ephemeral + or full_state + ): + return - if room_builder.rtype == "joined": - unread_notifications: Dict[str, int] = {} - room_sync = JoinedSyncResult( - room_id=room_id, - timeline=batch, - state=state, - ephemeral=ephemeral, - account_data=account_data_events, - unread_notifications=unread_notifications, - summary=summary, - unread_count=0, + state = await self.compute_state_delta( + room_id, + batch, + sync_config, + since_token, + now_token, + full_state=full_state, ) - if room_sync or always_include: - notifs = await self.unread_notifs_for_room_id(room_id, sync_config) + summary: Optional[JsonDict] = {} + + # we include a summary in room responses when we're lazy loading + # members (as the client otherwise doesn't have enough info to form + # the name itself). + if sync_config.filter_collection.lazy_load_members() and ( + # we recalculate the summary: + # if there are membership changes in the timeline, or + # if membership has changed during a gappy sync, or + # if this is an initial sync. + any(ev.type == EventTypes.Member for ev in batch.events) + or ( + # XXX: this may include false positives in the form of LL + # members which have snuck into state + batch.limited + and any(t == EventTypes.Member for (t, k) in state) + ) + or since_token is None + ): + summary = await self.compute_summary( + room_id, sync_config, batch, state, now_token + ) + + if room_builder.rtype == "joined": + unread_notifications: Dict[str, int] = {} + room_sync = JoinedSyncResult( + room_id=room_id, + timeline=batch, + state=state, + ephemeral=ephemeral, + account_data=account_data_events, + unread_notifications=unread_notifications, + summary=summary, + unread_count=0, + ) - unread_notifications["notification_count"] = notifs["notify_count"] - unread_notifications["highlight_count"] = notifs["highlight_count"] + if room_sync or always_include: + notifs = await self.unread_notifs_for_room_id(room_id, sync_config) - room_sync.unread_count = notifs["unread_count"] + unread_notifications["notification_count"] = notifs["notify_count"] + unread_notifications["highlight_count"] = notifs["highlight_count"] - sync_result_builder.joined.append(room_sync) + room_sync.unread_count = notifs["unread_count"] - if batch.limited and since_token: - user_id = sync_result_builder.sync_config.user.to_string() - logger.debug( - "Incremental gappy sync of %s for user %s with %d state events" - % (room_id, user_id, len(state)) + sync_result_builder.joined.append(room_sync) + + if batch.limited and since_token: + user_id = sync_result_builder.sync_config.user.to_string() + logger.debug( + "Incremental gappy sync of %s for user %s with %d state events" + % (room_id, user_id, len(state)) + ) + elif room_builder.rtype == "archived": + archived_room_sync = ArchivedSyncResult( + room_id=room_id, + timeline=batch, + state=state, + account_data=account_data_events, ) - elif room_builder.rtype == "archived": - archived_room_sync = ArchivedSyncResult( - room_id=room_id, - timeline=batch, - state=state, - account_data=account_data_events, - ) - if archived_room_sync or always_include: - sync_result_builder.archived.append(archived_room_sync) - else: - raise Exception("Unrecognized rtype: %r", room_builder.rtype) + if archived_room_sync or always_include: + sync_result_builder.archived.append(archived_room_sync) + else: + raise Exception("Unrecognized rtype: %r", room_builder.rtype) async def get_rooms_for_user_at( self, user_id: str, room_key: RoomStreamToken |