diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 4abb9b6127..1db5d68021 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -40,7 +40,7 @@ from synapse.handlers.relations import BundledAggregations
from synapse.logging.context import current_context
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
from synapse.push.clientformat import format_push_rules_for_user
-from synapse.storage.databases.main.event_push_actions import NotifCounts
+from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
from synapse.storage.roommember import MemberSummary
from synapse.storage.state import StateFilter
from synapse.types import (
@@ -128,6 +128,7 @@ class JoinedSyncResult:
ephemeral: List[JsonDict]
account_data: List[JsonDict]
unread_notifications: JsonDict
+ unread_thread_notifications: JsonDict
summary: Optional[JsonDict]
unread_count: int
@@ -1288,7 +1289,7 @@ class SyncHandler:
async def unread_notifs_for_room_id(
self, room_id: str, sync_config: SyncConfig
- ) -> NotifCounts:
+ ) -> RoomNotifCounts:
with Measure(self.clock, "unread_notifs_for_room_id"):
return await self.store.get_unread_event_push_actions_by_room_for_user(
@@ -1314,6 +1315,19 @@ class SyncHandler:
At the end, we transfer data from the `sync_result_builder` to a new `SyncResult`
instance to signify that the sync calculation is complete.
"""
+
+ user_id = sync_config.user.to_string()
+ app_service = self.store.get_app_service_by_user_id(user_id)
+ if app_service:
+ # We no longer support AS users using /sync directly.
+ # See https://github.com/matrix-org/matrix-doc/issues/1144
+ raise NotImplementedError()
+
+ # Note: we get the users room list *before* we get the current token, this
+ # avoids checking back in history if rooms are joined after the token is fetched.
+ token_before_rooms = self.event_sources.get_current_token()
+ mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id))
+
# NB: The now_token gets changed by some of the generate_sync_* methods,
# this is due to some of the underlying streams not supporting the ability
# to query up to a given point.
@@ -1321,6 +1335,57 @@ class SyncHandler:
now_token = self.event_sources.get_current_token()
log_kv({"now_token": now_token})
+ # Since we fetched the users room list before the token, there's a small window
+ # during which membership events may have been persisted, so we fetch these now
+ # and modify the joined room list for any changes between the get_rooms_for_user
+ # call and the get_current_token call.
+ membership_change_events = []
+ if since_token:
+ membership_change_events = await self.store.get_membership_changes_for_user(
+ user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude
+ )
+
+ mem_last_change_by_room_id: Dict[str, EventBase] = {}
+ for event in membership_change_events:
+ mem_last_change_by_room_id[event.room_id] = event
+
+ # For the latest membership event in each room found, add/remove the room ID
+ # from the joined room list accordingly. In this case we only care if the
+ # latest change is JOIN.
+
+ for room_id, event in mem_last_change_by_room_id.items():
+ assert event.internal_metadata.stream_ordering
+ if (
+ event.internal_metadata.stream_ordering
+ < token_before_rooms.room_key.stream
+ ):
+ continue
+
+ logger.info(
+ "User membership change between getting rooms and current token: %s %s %s",
+ user_id,
+ event.membership,
+ room_id,
+ )
+ # User joined a room - we have to then check the room state to ensure we
+ # respect any bans if there's a race between the join and ban events.
+ if event.membership == Membership.JOIN:
+ user_ids_in_room = await self.store.get_users_in_room(room_id)
+ if user_id in user_ids_in_room:
+ mutable_joined_room_ids.add(room_id)
+ # The user left the room, or left and was re-invited but not joined yet
+ else:
+ mutable_joined_room_ids.discard(room_id)
+
+ # Now we have our list of joined room IDs, exclude as configured and freeze
+ joined_room_ids = frozenset(
+ (
+ room_id
+ for room_id in mutable_joined_room_ids
+ if room_id not in self.rooms_to_exclude
+ )
+ )
+
logger.debug(
"Calculating sync response for %r between %s and %s",
sync_config.user,
@@ -1328,22 +1393,13 @@ class SyncHandler:
now_token,
)
- user_id = sync_config.user.to_string()
- app_service = self.store.get_app_service_by_user_id(user_id)
- if app_service:
- # We no longer support AS users using /sync directly.
- # See https://github.com/matrix-org/matrix-doc/issues/1144
- raise NotImplementedError()
- else:
- joined_room_ids = await self.get_rooms_for_user_at(
- user_id, now_token.room_key
- )
sync_result_builder = SyncResultBuilder(
sync_config,
full_state,
since_token=since_token,
now_token=now_token,
joined_room_ids=joined_room_ids,
+ membership_change_events=membership_change_events,
)
logger.debug("Fetching account data")
@@ -1824,19 +1880,12 @@ class SyncHandler:
Does not modify the `sync_result_builder`.
"""
- user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
- now_token = sync_result_builder.now_token
+ membership_change_events = sync_result_builder.membership_change_events
assert since_token
- # Get a list of membership change events that have happened to the user
- # requesting the sync.
- membership_changes = await self.store.get_membership_changes_for_user(
- user_id, since_token.room_key, now_token.room_key
- )
-
- if membership_changes:
+ if membership_change_events:
return True
stream_id = since_token.room_key.stream
@@ -1875,16 +1924,10 @@ class SyncHandler:
since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token
sync_config = sync_result_builder.sync_config
+ membership_change_events = sync_result_builder.membership_change_events
assert since_token
- # TODO: we've already called this function and ran this query in
- # _have_rooms_changed. We could keep the results in memory to avoid a
- # second query, at the cost of more complicated source code.
- membership_change_events = await self.store.get_membership_changes_for_user(
- user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude
- )
-
mem_change_events_by_room_id: Dict[str, List[EventBase]] = {}
for event in membership_change_events:
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
@@ -2353,6 +2396,7 @@ class SyncHandler:
ephemeral=ephemeral,
account_data=account_data_events,
unread_notifications=unread_notifications,
+ unread_thread_notifications={},
summary=summary,
unread_count=0,
)
@@ -2360,10 +2404,33 @@ class SyncHandler:
if room_sync or always_include:
notifs = await self.unread_notifs_for_room_id(room_id, sync_config)
- unread_notifications["notification_count"] = notifs.notify_count
- unread_notifications["highlight_count"] = notifs.highlight_count
-
- room_sync.unread_count = notifs.unread_count
+ # Notifications for the main timeline.
+ notify_count = notifs.main_timeline.notify_count
+ highlight_count = notifs.main_timeline.highlight_count
+ unread_count = notifs.main_timeline.unread_count
+
+ # Check the sync configuration.
+ if sync_config.filter_collection.unread_thread_notifications():
+ # And add info for each thread.
+ room_sync.unread_thread_notifications = {
+ thread_id: {
+ "notification_count": thread_notifs.notify_count,
+ "highlight_count": thread_notifs.highlight_count,
+ }
+ for thread_id, thread_notifs in notifs.threads.items()
+ if thread_id is not None
+ }
+
+ else:
+ # Combine the unread counts for all threads and main timeline.
+ for thread_notifs in notifs.threads.values():
+ notify_count += thread_notifs.notify_count
+ highlight_count += thread_notifs.highlight_count
+ unread_count += thread_notifs.unread_count
+
+ unread_notifications["notification_count"] = notify_count
+ unread_notifications["highlight_count"] = highlight_count
+ room_sync.unread_count = unread_count
sync_result_builder.joined.append(room_sync)
@@ -2385,60 +2452,6 @@ class SyncHandler:
else:
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
- async def get_rooms_for_user_at(
- self,
- user_id: str,
- room_key: RoomStreamToken,
- ) -> FrozenSet[str]:
- """Get set of joined rooms for a user at the given stream ordering.
-
- The stream ordering *must* be recent, otherwise this may throw an
- exception if older than a month. (This function is called with the
- current token, which should be perfectly fine).
-
- Args:
- user_id
- stream_ordering
-
- ReturnValue:
- Set of room_ids the user is in at given stream_ordering.
- """
- joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering(user_id)
-
- joined_room_ids = set()
-
- # We need to check that the stream ordering of the join for each room
- # is before the stream_ordering asked for. This might not be the case
- # if the user joins a room between us getting the current token and
- # calling `get_rooms_for_user_with_stream_ordering`.
- # If the membership's stream ordering is after the given stream
- # ordering, we need to go and work out if the user was in the room
- # before.
- # We also need to check whether the room should be excluded from sync
- # responses as per the homeserver config.
- for joined_room in joined_rooms:
- if joined_room.room_id in self.rooms_to_exclude:
- continue
-
- if not joined_room.event_pos.persisted_after(room_key):
- joined_room_ids.add(joined_room.room_id)
- continue
-
- logger.info("User joined room after current token: %s", joined_room.room_id)
-
- extrems = (
- await self.store.get_forward_extremities_for_room_at_stream_ordering(
- joined_room.room_id, joined_room.event_pos.stream
- )
- )
- user_ids_in_room = await self.state.get_current_user_ids_in_room(
- joined_room.room_id, extrems
- )
- if user_id in user_ids_in_room:
- joined_room_ids.add(joined_room.room_id)
-
- return frozenset(joined_room_ids)
-
def _action_has_highlight(actions: List[JsonDict]) -> bool:
for action in actions:
@@ -2535,6 +2548,7 @@ class SyncResultBuilder:
since_token: Optional[StreamToken]
now_token: StreamToken
joined_room_ids: FrozenSet[str]
+ membership_change_events: List[EventBase]
presence: List[UserPresenceState] = attr.Factory(list)
account_data: List[JsonDict] = attr.Factory(list)
|