summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/handlers/sync.py267
1 files changed, 141 insertions, 126 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index e017b28cd2..edfdb99cbd 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1533,21 +1533,18 @@ class SyncHandler:
         newly_left_rooms = room_changes.newly_left_rooms
 
         async def handle_room_entries(room_entry: "RoomSyncResultBuilder"):
-            with start_active_span("generate_room_entry"):
-                set_tag("room_id", room_entry.room_id)
-                log_kv({"events": len(room_entry.events or [])})
-                logger.debug("Generating room entry for %s", room_entry.room_id)
-                res = await self._generate_room_entry(
-                    sync_result_builder,
-                    ignored_users,
-                    room_entry,
-                    ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
-                    tags=tags_by_room.get(room_entry.room_id),
-                    account_data=account_data_by_room.get(room_entry.room_id, {}),
-                    always_include=sync_result_builder.full_state,
-                )
-                logger.debug("Generated room entry for %s", room_entry.room_id)
-                return res
+            logger.debug("Generating room entry for %s", room_entry.room_id)
+            res = await self._generate_room_entry(
+                sync_result_builder,
+                ignored_users,
+                room_entry,
+                ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
+                tags=tags_by_room.get(room_entry.room_id),
+                account_data=account_data_by_room.get(room_entry.room_id, {}),
+                always_include=sync_result_builder.full_state,
+            )
+            logger.debug("Generated room entry for %s", room_entry.room_id)
+            return res
 
         await concurrently_execute(handle_room_entries, room_entries, 10)
 
@@ -1960,139 +1957,157 @@ class SyncHandler:
         room_id = room_builder.room_id
         since_token = room_builder.since_token
         upto_token = room_builder.upto_token
-        log_kv(
-            {
-                "since_token": since_token,
-                "upto_token": upto_token,
-            }
-        )
 
-        batch = await self._load_filtered_recents(
-            room_id,
-            sync_config,
-            now_token=upto_token,
-            since_token=since_token,
-            potential_recents=events,
-            newly_joined_room=newly_joined,
-        )
-        log_kv(
-            {
-                "batch_events": len(batch.events),
-                "prev_batch": batch.prev_batch,
-                "batch_limited": batch.limited,
-            }
-        )
+        with start_active_span("generate_room_entry"):
+            set_tag("room_id", room_id)
+            log_kv({"events": len(events or ())})
 
-        # Note: `batch` can be both empty and limited here in the case where
-        # `_load_filtered_recents` can't find any events the user should see
-        # (e.g. due to having ignored the sender of the last 50 events).
+            log_kv(
+                {
+                    "since_token": since_token,
+                    "upto_token": upto_token,
+                }
+            )
 
-        if newly_joined:
-            # debug for https://github.com/matrix-org/synapse/issues/4422
-            issue4422_logger.debug(
-                "Timeline events after filtering in newly-joined room %s: %r",
+            batch = await self._load_filtered_recents(
                 room_id,
-                batch,
+                sync_config,
+                now_token=upto_token,
+                since_token=since_token,
+                potential_recents=events,
+                newly_joined_room=newly_joined,
+            )
+            log_kv(
+                {
+                    "batch_events": len(batch.events),
+                    "prev_batch": batch.prev_batch,
+                    "batch_limited": batch.limited,
+                }
             )
 
-        # When we join the room (or the client requests full_state), we should
-        # send down any existing tags. Usually the user won't have tags in a
-        # newly joined room, unless either a) they've joined before or b) the
-        # tag was added by synapse e.g. for server notice rooms.
-        if full_state:
-            user_id = sync_result_builder.sync_config.user.to_string()
-            tags = await self.store.get_tags_for_room(user_id, room_id)
+            # Note: `batch` can be both empty and limited here in the case where
+            # `_load_filtered_recents` can't find any events the user should see
+            # (e.g. due to having ignored the sender of the last 50 events).
 
-            # If there aren't any tags, don't send the empty tags list down
-            # sync
-            if not tags:
-                tags = None
+            if newly_joined:
+                # debug for https://github.com/matrix-org/synapse/issues/4422
+                issue4422_logger.debug(
+                    "Timeline events after filtering in newly-joined room %s: %r",
+                    room_id,
+                    batch,
+                )
 
-        account_data_events = []
-        if tags is not None:
-            account_data_events.append({"type": "m.tag", "content": {"tags": tags}})
+            # When we join the room (or the client requests full_state), we should
+            # send down any existing tags. Usually the user won't have tags in a
+            # newly joined room, unless either a) they've joined before or b) the
+            # tag was added by synapse e.g. for server notice rooms.
+            if full_state:
+                user_id = sync_result_builder.sync_config.user.to_string()
+                tags = await self.store.get_tags_for_room(user_id, room_id)
 
-        for account_data_type, content in account_data.items():
-            account_data_events.append({"type": account_data_type, "content": content})
+                # If there aren't any tags, don't send the empty tags list down
+                # sync
+                if not tags:
+                    tags = None
 
-        account_data_events = sync_config.filter_collection.filter_room_account_data(
-            account_data_events
-        )
+            account_data_events = []
+            if tags is not None:
+                account_data_events.append({"type": "m.tag", "content": {"tags": tags}})
 
-        ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
+            for account_data_type, content in account_data.items():
+                account_data_events.append(
+                    {"type": account_data_type, "content": content}
+                )
 
-        if not (
-            always_include or batch or account_data_events or ephemeral or full_state
-        ):
-            return
+            account_data_events = (
+                sync_config.filter_collection.filter_room_account_data(
+                    account_data_events
+                )
+            )
 
-        state = await self.compute_state_delta(
-            room_id, batch, sync_config, since_token, now_token, full_state=full_state
-        )
+            ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
 
-        summary: Optional[JsonDict] = {}
-
-        # we include a summary in room responses when we're lazy loading
-        # members (as the client otherwise doesn't have enough info to form
-        # the name itself).
-        if sync_config.filter_collection.lazy_load_members() and (
-            # we recalculate the summary:
-            #   if there are membership changes in the timeline, or
-            #   if membership has changed during a gappy sync, or
-            #   if this is an initial sync.
-            any(ev.type == EventTypes.Member for ev in batch.events)
-            or (
-                # XXX: this may include false positives in the form of LL
-                # members which have snuck into state
-                batch.limited
-                and any(t == EventTypes.Member for (t, k) in state)
-            )
-            or since_token is None
-        ):
-            summary = await self.compute_summary(
-                room_id, sync_config, batch, state, now_token
-            )
+            if not (
+                always_include
+                or batch
+                or account_data_events
+                or ephemeral
+                or full_state
+            ):
+                return
 
-        if room_builder.rtype == "joined":
-            unread_notifications: Dict[str, int] = {}
-            room_sync = JoinedSyncResult(
-                room_id=room_id,
-                timeline=batch,
-                state=state,
-                ephemeral=ephemeral,
-                account_data=account_data_events,
-                unread_notifications=unread_notifications,
-                summary=summary,
-                unread_count=0,
+            state = await self.compute_state_delta(
+                room_id,
+                batch,
+                sync_config,
+                since_token,
+                now_token,
+                full_state=full_state,
             )
 
-            if room_sync or always_include:
-                notifs = await self.unread_notifs_for_room_id(room_id, sync_config)
+            summary: Optional[JsonDict] = {}
+
+            # we include a summary in room responses when we're lazy loading
+            # members (as the client otherwise doesn't have enough info to form
+            # the name itself).
+            if sync_config.filter_collection.lazy_load_members() and (
+                # we recalculate the summary:
+                #   if there are membership changes in the timeline, or
+                #   if membership has changed during a gappy sync, or
+                #   if this is an initial sync.
+                any(ev.type == EventTypes.Member for ev in batch.events)
+                or (
+                    # XXX: this may include false positives in the form of LL
+                    # members which have snuck into state
+                    batch.limited
+                    and any(t == EventTypes.Member for (t, k) in state)
+                )
+                or since_token is None
+            ):
+                summary = await self.compute_summary(
+                    room_id, sync_config, batch, state, now_token
+                )
+
+            if room_builder.rtype == "joined":
+                unread_notifications: Dict[str, int] = {}
+                room_sync = JoinedSyncResult(
+                    room_id=room_id,
+                    timeline=batch,
+                    state=state,
+                    ephemeral=ephemeral,
+                    account_data=account_data_events,
+                    unread_notifications=unread_notifications,
+                    summary=summary,
+                    unread_count=0,
+                )
 
-                unread_notifications["notification_count"] = notifs["notify_count"]
-                unread_notifications["highlight_count"] = notifs["highlight_count"]
+                if room_sync or always_include:
+                    notifs = await self.unread_notifs_for_room_id(room_id, sync_config)
 
-                room_sync.unread_count = notifs["unread_count"]
+                    unread_notifications["notification_count"] = notifs["notify_count"]
+                    unread_notifications["highlight_count"] = notifs["highlight_count"]
 
-                sync_result_builder.joined.append(room_sync)
+                    room_sync.unread_count = notifs["unread_count"]
 
-            if batch.limited and since_token:
-                user_id = sync_result_builder.sync_config.user.to_string()
-                logger.debug(
-                    "Incremental gappy sync of %s for user %s with %d state events"
-                    % (room_id, user_id, len(state))
+                    sync_result_builder.joined.append(room_sync)
+
+                if batch.limited and since_token:
+                    user_id = sync_result_builder.sync_config.user.to_string()
+                    logger.debug(
+                        "Incremental gappy sync of %s for user %s with %d state events"
+                        % (room_id, user_id, len(state))
+                    )
+            elif room_builder.rtype == "archived":
+                archived_room_sync = ArchivedSyncResult(
+                    room_id=room_id,
+                    timeline=batch,
+                    state=state,
+                    account_data=account_data_events,
                 )
-        elif room_builder.rtype == "archived":
-            archived_room_sync = ArchivedSyncResult(
-                room_id=room_id,
-                timeline=batch,
-                state=state,
-                account_data=account_data_events,
-            )
-            if archived_room_sync or always_include:
-                sync_result_builder.archived.append(archived_room_sync)
-        else:
-            raise Exception("Unrecognized rtype: %r", room_builder.rtype)
+                if archived_room_sync or always_include:
+                    sync_result_builder.archived.append(archived_room_sync)
+            else:
+                raise Exception("Unrecognized rtype: %r", room_builder.rtype)
 
     async def get_rooms_for_user_at(
         self, user_id: str, room_key: RoomStreamToken