diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index e017b28cd2..edfdb99cbd 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1533,21 +1533,18 @@ class SyncHandler:
newly_left_rooms = room_changes.newly_left_rooms
async def handle_room_entries(room_entry: "RoomSyncResultBuilder"):
- with start_active_span("generate_room_entry"):
- set_tag("room_id", room_entry.room_id)
- log_kv({"events": len(room_entry.events or [])})
- logger.debug("Generating room entry for %s", room_entry.room_id)
- res = await self._generate_room_entry(
- sync_result_builder,
- ignored_users,
- room_entry,
- ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
- tags=tags_by_room.get(room_entry.room_id),
- account_data=account_data_by_room.get(room_entry.room_id, {}),
- always_include=sync_result_builder.full_state,
- )
- logger.debug("Generated room entry for %s", room_entry.room_id)
- return res
+ logger.debug("Generating room entry for %s", room_entry.room_id)
+ res = await self._generate_room_entry(
+ sync_result_builder,
+ ignored_users,
+ room_entry,
+ ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
+ tags=tags_by_room.get(room_entry.room_id),
+ account_data=account_data_by_room.get(room_entry.room_id, {}),
+ always_include=sync_result_builder.full_state,
+ )
+ logger.debug("Generated room entry for %s", room_entry.room_id)
+ return res
await concurrently_execute(handle_room_entries, room_entries, 10)
@@ -1960,139 +1957,157 @@ class SyncHandler:
room_id = room_builder.room_id
since_token = room_builder.since_token
upto_token = room_builder.upto_token
- log_kv(
- {
- "since_token": since_token,
- "upto_token": upto_token,
- }
- )
- batch = await self._load_filtered_recents(
- room_id,
- sync_config,
- now_token=upto_token,
- since_token=since_token,
- potential_recents=events,
- newly_joined_room=newly_joined,
- )
- log_kv(
- {
- "batch_events": len(batch.events),
- "prev_batch": batch.prev_batch,
- "batch_limited": batch.limited,
- }
- )
+ with start_active_span("generate_room_entry"):
+ set_tag("room_id", room_id)
+ log_kv({"events": len(events or ())})
- # Note: `batch` can be both empty and limited here in the case where
- # `_load_filtered_recents` can't find any events the user should see
- # (e.g. due to having ignored the sender of the last 50 events).
+ log_kv(
+ {
+ "since_token": since_token,
+ "upto_token": upto_token,
+ }
+ )
- if newly_joined:
- # debug for https://github.com/matrix-org/synapse/issues/4422
- issue4422_logger.debug(
- "Timeline events after filtering in newly-joined room %s: %r",
+ batch = await self._load_filtered_recents(
room_id,
- batch,
+ sync_config,
+ now_token=upto_token,
+ since_token=since_token,
+ potential_recents=events,
+ newly_joined_room=newly_joined,
+ )
+ log_kv(
+ {
+ "batch_events": len(batch.events),
+ "prev_batch": batch.prev_batch,
+ "batch_limited": batch.limited,
+ }
)
- # When we join the room (or the client requests full_state), we should
- # send down any existing tags. Usually the user won't have tags in a
- # newly joined room, unless either a) they've joined before or b) the
- # tag was added by synapse e.g. for server notice rooms.
- if full_state:
- user_id = sync_result_builder.sync_config.user.to_string()
- tags = await self.store.get_tags_for_room(user_id, room_id)
+ # Note: `batch` can be both empty and limited here in the case where
+ # `_load_filtered_recents` can't find any events the user should see
+ # (e.g. due to having ignored the sender of the last 50 events).
- # If there aren't any tags, don't send the empty tags list down
- # sync
- if not tags:
- tags = None
+ if newly_joined:
+ # debug for https://github.com/matrix-org/synapse/issues/4422
+ issue4422_logger.debug(
+ "Timeline events after filtering in newly-joined room %s: %r",
+ room_id,
+ batch,
+ )
- account_data_events = []
- if tags is not None:
- account_data_events.append({"type": "m.tag", "content": {"tags": tags}})
+ # When we join the room (or the client requests full_state), we should
+ # send down any existing tags. Usually the user won't have tags in a
+ # newly joined room, unless either a) they've joined before or b) the
+ # tag was added by synapse e.g. for server notice rooms.
+ if full_state:
+ user_id = sync_result_builder.sync_config.user.to_string()
+ tags = await self.store.get_tags_for_room(user_id, room_id)
- for account_data_type, content in account_data.items():
- account_data_events.append({"type": account_data_type, "content": content})
+ # If there aren't any tags, don't send the empty tags list down
+ # sync
+ if not tags:
+ tags = None
- account_data_events = sync_config.filter_collection.filter_room_account_data(
- account_data_events
- )
+ account_data_events = []
+ if tags is not None:
+ account_data_events.append({"type": "m.tag", "content": {"tags": tags}})
- ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
+ for account_data_type, content in account_data.items():
+ account_data_events.append(
+ {"type": account_data_type, "content": content}
+ )
- if not (
- always_include or batch or account_data_events or ephemeral or full_state
- ):
- return
+ account_data_events = (
+ sync_config.filter_collection.filter_room_account_data(
+ account_data_events
+ )
+ )
- state = await self.compute_state_delta(
- room_id, batch, sync_config, since_token, now_token, full_state=full_state
- )
+ ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
- summary: Optional[JsonDict] = {}
-
- # we include a summary in room responses when we're lazy loading
- # members (as the client otherwise doesn't have enough info to form
- # the name itself).
- if sync_config.filter_collection.lazy_load_members() and (
- # we recalculate the summary:
- # if there are membership changes in the timeline, or
- # if membership has changed during a gappy sync, or
- # if this is an initial sync.
- any(ev.type == EventTypes.Member for ev in batch.events)
- or (
- # XXX: this may include false positives in the form of LL
- # members which have snuck into state
- batch.limited
- and any(t == EventTypes.Member for (t, k) in state)
- )
- or since_token is None
- ):
- summary = await self.compute_summary(
- room_id, sync_config, batch, state, now_token
- )
+ if not (
+ always_include
+ or batch
+ or account_data_events
+ or ephemeral
+ or full_state
+ ):
+ return
- if room_builder.rtype == "joined":
- unread_notifications: Dict[str, int] = {}
- room_sync = JoinedSyncResult(
- room_id=room_id,
- timeline=batch,
- state=state,
- ephemeral=ephemeral,
- account_data=account_data_events,
- unread_notifications=unread_notifications,
- summary=summary,
- unread_count=0,
+ state = await self.compute_state_delta(
+ room_id,
+ batch,
+ sync_config,
+ since_token,
+ now_token,
+ full_state=full_state,
)
- if room_sync or always_include:
- notifs = await self.unread_notifs_for_room_id(room_id, sync_config)
+ summary: Optional[JsonDict] = {}
+
+ # we include a summary in room responses when we're lazy loading
+ # members (as the client otherwise doesn't have enough info to form
+ # the name itself).
+ if sync_config.filter_collection.lazy_load_members() and (
+ # we recalculate the summary:
+ # if there are membership changes in the timeline, or
+ # if membership has changed during a gappy sync, or
+ # if this is an initial sync.
+ any(ev.type == EventTypes.Member for ev in batch.events)
+ or (
+ # XXX: this may include false positives in the form of LL
+ # members which have snuck into state
+ batch.limited
+ and any(t == EventTypes.Member for (t, k) in state)
+ )
+ or since_token is None
+ ):
+ summary = await self.compute_summary(
+ room_id, sync_config, batch, state, now_token
+ )
+
+ if room_builder.rtype == "joined":
+ unread_notifications: Dict[str, int] = {}
+ room_sync = JoinedSyncResult(
+ room_id=room_id,
+ timeline=batch,
+ state=state,
+ ephemeral=ephemeral,
+ account_data=account_data_events,
+ unread_notifications=unread_notifications,
+ summary=summary,
+ unread_count=0,
+ )
- unread_notifications["notification_count"] = notifs["notify_count"]
- unread_notifications["highlight_count"] = notifs["highlight_count"]
+ if room_sync or always_include:
+ notifs = await self.unread_notifs_for_room_id(room_id, sync_config)
- room_sync.unread_count = notifs["unread_count"]
+ unread_notifications["notification_count"] = notifs["notify_count"]
+ unread_notifications["highlight_count"] = notifs["highlight_count"]
- sync_result_builder.joined.append(room_sync)
+ room_sync.unread_count = notifs["unread_count"]
- if batch.limited and since_token:
- user_id = sync_result_builder.sync_config.user.to_string()
- logger.debug(
- "Incremental gappy sync of %s for user %s with %d state events"
- % (room_id, user_id, len(state))
+ sync_result_builder.joined.append(room_sync)
+
+ if batch.limited and since_token:
+ user_id = sync_result_builder.sync_config.user.to_string()
+ logger.debug(
+ "Incremental gappy sync of %s for user %s with %d state events"
+ % (room_id, user_id, len(state))
+ )
+ elif room_builder.rtype == "archived":
+ archived_room_sync = ArchivedSyncResult(
+ room_id=room_id,
+ timeline=batch,
+ state=state,
+ account_data=account_data_events,
)
- elif room_builder.rtype == "archived":
- archived_room_sync = ArchivedSyncResult(
- room_id=room_id,
- timeline=batch,
- state=state,
- account_data=account_data_events,
- )
- if archived_room_sync or always_include:
- sync_result_builder.archived.append(archived_room_sync)
- else:
- raise Exception("Unrecognized rtype: %r", room_builder.rtype)
+ if archived_room_sync or always_include:
+ sync_result_builder.archived.append(archived_room_sync)
+ else:
+ raise Exception("Unrecognized rtype: %r", room_builder.rtype)
async def get_rooms_for_user_at(
self, user_id: str, room_key: RoomStreamToken
|