diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index d42a414c90..259456b55d 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -13,7 +13,20 @@
# limitations under the License.
import itertools
import logging
-from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple
+from typing import (
+ TYPE_CHECKING,
+ AbstractSet,
+ Any,
+ Collection,
+ Dict,
+ FrozenSet,
+ List,
+ Mapping,
+ Optional,
+ Sequence,
+ Set,
+ Tuple,
+)
import attr
from prometheus_client import Counter
@@ -27,7 +40,8 @@ from synapse.handlers.relations import BundledAggregations
from synapse.logging.context import current_context
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
from synapse.push.clientformat import format_push_rules_for_user
-from synapse.storage.databases.main.event_push_actions import NotifCounts
+from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
+from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
from synapse.storage.roommember import MemberSummary
from synapse.storage.state import StateFilter
from synapse.types import (
@@ -89,7 +103,7 @@ class SyncConfig:
@attr.s(slots=True, frozen=True, auto_attribs=True)
class TimelineBatch:
prev_batch: StreamToken
- events: List[EventBase]
+ events: Sequence[EventBase]
limited: bool
# A mapping of event ID to the bundled aggregations for the above events.
# This is only calculated if limited is true.
@@ -115,6 +129,7 @@ class JoinedSyncResult:
ephemeral: List[JsonDict]
account_data: List[JsonDict]
unread_notifications: JsonDict
+ unread_thread_notifications: JsonDict
summary: Optional[JsonDict]
unread_count: int
@@ -507,10 +522,17 @@ class SyncHandler:
# ensure that we always include current state in the timeline
current_state_ids: FrozenSet[str] = frozenset()
if any(e.is_state() for e in recents):
+ # FIXME(faster_joins): We use the partial state here as
+ # we don't want to block `/sync` on finishing a lazy join.
+ # Which should be fine once
+ # https://github.com/matrix-org/synapse/issues/12989 is resolved,
+ # since we shouldn't reach here anymore?
+ # Note that we use the current state as a whitelist for filtering
+ # `recents`, so partial state is only a problem when a membership
+ # event turns up in `recents` but has not made it into the current
+ # state.
current_state_ids_map = (
- await self._state_storage_controller.get_current_state_ids(
- room_id
- )
+ await self.store.get_partial_current_state_ids(room_id)
)
current_state_ids = frozenset(current_state_ids_map.values())
@@ -579,7 +601,13 @@ class SyncHandler:
if any(e.is_state() for e in loaded_recents):
# FIXME(faster_joins): We use the partial state here as
# we don't want to block `/sync` on finishing a lazy join.
- # Is this the correct way of doing it?
+ # Which should be fine once
+ # https://github.com/matrix-org/synapse/issues/12989 is resolved,
+ # since we shouldn't reach here anymore?
+ # Note that we use the current state as a whitelist for filtering
+ # `loaded_recents`, so partial state is only a problem when a
+ # membership event turns up in `loaded_recents` but has not made it
+ # into the current state.
current_state_ids_map = (
await self.store.get_partial_current_state_ids(room_id)
)
@@ -627,7 +655,10 @@ class SyncHandler:
)
async def get_state_after_event(
- self, event_id: str, state_filter: Optional[StateFilter] = None
+ self,
+ event_id: str,
+ state_filter: Optional[StateFilter] = None,
+ await_full_state: bool = True,
) -> StateMap[str]:
"""
Get the room state after the given event
@@ -635,9 +666,14 @@ class SyncHandler:
Args:
event_id: event of interest
state_filter: The state filter used to fetch state from the database.
+ await_full_state: if `True`, will block if we do not yet have complete state
+ at the event and `state_filter` is not satisfied by partial state.
+ Defaults to `True`.
"""
state_ids = await self._state_storage_controller.get_state_ids_for_event(
- event_id, state_filter=state_filter or StateFilter.all()
+ event_id,
+ state_filter=state_filter or StateFilter.all(),
+ await_full_state=await_full_state,
)
# using get_metadata_for_events here (instead of get_event) sidesteps an issue
@@ -660,6 +696,7 @@ class SyncHandler:
room_id: str,
stream_position: StreamToken,
state_filter: Optional[StateFilter] = None,
+ await_full_state: bool = True,
) -> StateMap[str]:
"""Get the room state at a particular stream position
@@ -667,6 +704,9 @@ class SyncHandler:
room_id: room for which to get state
stream_position: point at which to get state
state_filter: The state filter used to fetch state from the database.
+ await_full_state: if `True`, will block if we do not yet have complete state
+ at the last event in the room before `stream_position` and
+ `state_filter` is not satisfied by partial state. Defaults to `True`.
"""
# FIXME: This gets the state at the latest event before the stream ordering,
# which might not be the same as the "current state" of the room at the time
@@ -678,7 +718,9 @@ class SyncHandler:
if last_event_id:
state = await self.get_state_after_event(
- last_event_id, state_filter=state_filter or StateFilter.all()
+ last_event_id,
+ state_filter=state_filter or StateFilter.all(),
+ await_full_state=await_full_state,
)
else:
@@ -764,18 +806,6 @@ class SyncHandler:
if canonical_alias and canonical_alias.content.get("alias"):
return summary
- me = sync_config.user.to_string()
-
- joined_user_ids = [
- r[0] for r in details.get(Membership.JOIN, empty_ms).members if r[0] != me
- ]
- invited_user_ids = [
- r[0] for r in details.get(Membership.INVITE, empty_ms).members if r[0] != me
- ]
- gone_user_ids = [
- r[0] for r in details.get(Membership.LEAVE, empty_ms).members if r[0] != me
- ] + [r[0] for r in details.get(Membership.BAN, empty_ms).members if r[0] != me]
-
# FIXME: only build up a member_ids list for our heroes
member_ids = {}
for membership in (
@@ -787,11 +817,8 @@ class SyncHandler:
for user_id, event_id in details.get(membership, empty_ms).members:
member_ids[user_id] = event_id
- # FIXME: order by stream ordering rather than as returned by SQL
- if joined_user_ids or invited_user_ids:
- summary["m.heroes"] = sorted(joined_user_ids + invited_user_ids)[0:5]
- else:
- summary["m.heroes"] = sorted(gone_user_ids)[0:5]
+ me = sync_config.user.to_string()
+ summary["m.heroes"] = extract_heroes_from_room_summary(details, me)
if not sync_config.filter_collection.lazy_load_members():
return summary
@@ -852,16 +879,26 @@ class SyncHandler:
now_token: StreamToken,
full_state: bool,
) -> MutableStateMap[EventBase]:
- """Works out the difference in state between the start of the timeline
- and the previous sync.
+ """Works out the difference in state between the end of the previous sync and
+ the start of the timeline.
Args:
room_id:
batch: The timeline batch for the room that will be sent to the user.
sync_config:
- since_token: Token of the end of the previous batch. May be None.
+ since_token: Token of the end of the previous batch. May be `None`.
now_token: Token of the end of the current batch.
full_state: Whether to force returning the full state.
+ `lazy_load_members` still applies when `full_state` is `True`.
+
+ Returns:
+ The state to return in the sync response for the room.
+
+ Clients will overlay this onto the state at the end of the previous sync to
+ arrive at the state at the start of the timeline.
+
+ Clients will then overlay state events in the timeline to arrive at the
+ state at the end of the timeline, in preparation for the next sync.
"""
# TODO(mjark) Check if the state events were received by the server
# after the previous sync, since we need to include those state
@@ -869,8 +906,17 @@ class SyncHandler:
# TODO(mjark) Check for new redactions in the state events.
with Measure(self.clock, "compute_state_delta"):
+ # The memberships needed for events in the timeline.
+ # Only calculated when `lazy_load_members` is on.
+ members_to_fetch: Optional[Set[str]] = None
+
+ # A dictionary mapping user IDs to the first event in the timeline sent by
+ # them. Only calculated when `lazy_load_members` is on.
+ first_event_by_sender_map: Optional[Dict[str, EventBase]] = None
- members_to_fetch = None
+ # The contribution to the room state from state events in the timeline.
+ # Only contains the last event for any given state key.
+ timeline_state: StateMap[str]
lazy_load_members = sync_config.filter_collection.lazy_load_members()
include_redundant_members = (
@@ -881,10 +927,23 @@ class SyncHandler:
# We only request state for the members needed to display the
# timeline:
- members_to_fetch = {
- event.sender # FIXME: we also care about invite targets etc.
- for event in batch.events
- }
+ timeline_state = {}
+
+ members_to_fetch = set()
+ first_event_by_sender_map = {}
+ for event in batch.events:
+ # Build the map from user IDs to the first timeline event they sent.
+ if event.sender not in first_event_by_sender_map:
+ first_event_by_sender_map[event.sender] = event
+
+ # We need the event's sender, unless their membership was in a
+ # previous timeline event.
+ if (EventTypes.Member, event.sender) not in timeline_state:
+ members_to_fetch.add(event.sender)
+ # FIXME: we also care about invite targets etc.
+
+ if event.is_state():
+ timeline_state[(event.type, event.state_key)] = event.event_id
if full_state:
# always make sure we LL ourselves so we know we're in the room
@@ -894,55 +953,80 @@ class SyncHandler:
members_to_fetch.add(sync_config.user.to_string())
state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
+
+ # We are happy to use partial state to compute the `/sync` response.
+ # Since partial state may not include the lazy-loaded memberships we
+ # require, we fix up the state response afterwards with memberships from
+ # auth events.
+ await_full_state = False
else:
+ timeline_state = {
+ (event.type, event.state_key): event.event_id
+ for event in batch.events
+ if event.is_state()
+ }
+
state_filter = StateFilter.all()
+ await_full_state = True
- timeline_state = {
- (event.type, event.state_key): event.event_id
- for event in batch.events
- if event.is_state()
- }
+ # Now calculate the state to return in the sync response for the room.
+ # This is more or less the change in state between the end of the previous
+ # sync's timeline and the start of the current sync's timeline.
+ # See the docstring above for details.
+ state_ids: StateMap[str]
if full_state:
if batch:
- current_state_ids = (
+ state_at_timeline_end = (
await self._state_storage_controller.get_state_ids_for_event(
- batch.events[-1].event_id, state_filter=state_filter
+ batch.events[-1].event_id,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
)
)
- state_ids = (
+ state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
- batch.events[0].event_id, state_filter=state_filter
+ batch.events[0].event_id,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
)
)
else:
- current_state_ids = await self.get_state_at(
- room_id, stream_position=now_token, state_filter=state_filter
+ state_at_timeline_end = await self.get_state_at(
+ room_id,
+ stream_position=now_token,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
)
- state_ids = current_state_ids
+ state_at_timeline_start = state_at_timeline_end
state_ids = _calculate_state(
timeline_contains=timeline_state,
- timeline_start=state_ids,
- previous={},
- current=current_state_ids,
+ timeline_start=state_at_timeline_start,
+ timeline_end=state_at_timeline_end,
+ previous_timeline_end={},
lazy_load_members=lazy_load_members,
)
elif batch.limited:
if batch:
state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
- batch.events[0].event_id, state_filter=state_filter
+ batch.events[0].event_id,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
)
)
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_start = await self.get_state_at(
- room_id, stream_position=now_token, state_filter=state_filter
+ room_id,
+ stream_position=now_token,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
)
# for now, we disable LL for gappy syncs - see
@@ -964,28 +1048,35 @@ class SyncHandler:
# is indeed the case.
assert since_token is not None
state_at_previous_sync = await self.get_state_at(
- room_id, stream_position=since_token, state_filter=state_filter
+ room_id,
+ stream_position=since_token,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
)
if batch:
- current_state_ids = (
+ state_at_timeline_end = (
await self._state_storage_controller.get_state_ids_for_event(
- batch.events[-1].event_id, state_filter=state_filter
+ batch.events[-1].event_id,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
)
)
else:
- # Its not clear how we get here, but empirically we do
- # (#5407). Logging has been added elsewhere to try and
- # figure out where this state comes from.
- current_state_ids = await self.get_state_at(
- room_id, stream_position=now_token, state_filter=state_filter
+ # We can get here if the user has ignored the senders of all
+ # the recent events.
+ state_at_timeline_end = await self.get_state_at(
+ room_id,
+ stream_position=now_token,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
)
state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_at_timeline_start,
- previous=state_at_previous_sync,
- current=current_state_ids,
+ timeline_end=state_at_timeline_end,
+ previous_timeline_end=state_at_previous_sync,
# we have to include LL members in case LL initial sync missed them
lazy_load_members=lazy_load_members,
)
@@ -1008,8 +1099,30 @@ class SyncHandler:
(EventTypes.Member, member)
for member in members_to_fetch
),
+ await_full_state=False,
)
+ # If we only have partial state for the room, `state_ids` may be missing the
+ # memberships we wanted. We attempt to find some by digging through the auth
+ # events of timeline events.
+ if lazy_load_members and await self.store.is_partial_state_room(room_id):
+ assert members_to_fetch is not None
+ assert first_event_by_sender_map is not None
+
+ additional_state_ids = (
+ await self._find_missing_partial_state_memberships(
+ room_id, members_to_fetch, first_event_by_sender_map, state_ids
+ )
+ )
+ state_ids = {**state_ids, **additional_state_ids}
+
+ # At this point, if `lazy_load_members` is enabled, `state_ids` includes
+ # the memberships of all event senders in the timeline. This is because we
+ # may not have sent the memberships in a previous sync.
+
+ # When `include_redundant_members` is on, we send all the lazy-loaded
+ # memberships of event senders. Otherwise we make an effort to limit the set
+ # of memberships we send to those that we have not already sent to this client.
if lazy_load_members and not include_redundant_members:
cache_key = (sync_config.user.to_string(), sync_config.device_id)
cache = self.get_lazy_loaded_members_cache(cache_key)
@@ -1051,9 +1164,118 @@ class SyncHandler:
if e.type != EventTypes.Aliases # until MSC2261 or alternative solution
}
+ async def _find_missing_partial_state_memberships(
+ self,
+ room_id: str,
+ members_to_fetch: Collection[str],
+ events_with_membership_auth: Mapping[str, EventBase],
+ found_state_ids: StateMap[str],
+ ) -> StateMap[str]:
+ """Finds missing memberships from a set of auth events and returns them as a
+ state map.
+
+ Args:
+ room_id: The partial state room to find the remaining memberships for.
+ members_to_fetch: The memberships to find.
+ events_with_membership_auth: A mapping from user IDs to events whose auth
+ events would contain their prior membership, if one exists.
+ Note that join events will not cite a prior membership if a user has
+ never been in a room before.
+ found_state_ids: A dict from (type, state_key) -> state_event_id, containing
+ memberships that have been previously found. Entries in
+ `members_to_fetch` that have a membership in `found_state_ids` are
+ ignored.
+
+ Returns:
+ A dict from ("m.room.member", state_key) -> state_event_id, containing the
+ memberships missing from `found_state_ids`.
+
+ When `events_with_membership_auth` contains a join event for a given user
+ which does not cite a prior membership, no membership is returned for that
+ user.
+
+ Raises:
+ KeyError: if `events_with_membership_auth` does not have an entry for a
+ missing membership. Memberships in `found_state_ids` do not need an
+ entry in `events_with_membership_auth`.
+ """
+ additional_state_ids: MutableStateMap[str] = {}
+
+ # Tracks the missing members for logging purposes.
+ missing_members = set()
+
+ # Identify memberships missing from `found_state_ids` and pick out the auth
+ # events in which to look for them.
+ auth_event_ids: Set[str] = set()
+ for member in members_to_fetch:
+ if (EventTypes.Member, member) in found_state_ids:
+ continue
+
+ event_with_membership_auth = events_with_membership_auth[member]
+ is_join = (
+ event_with_membership_auth.is_state()
+ and event_with_membership_auth.type == EventTypes.Member
+ and event_with_membership_auth.state_key == member
+ and event_with_membership_auth.content.get("membership")
+ == Membership.JOIN
+ )
+ if not is_join:
+ # The event must include the desired membership as an auth event, unless
+ # it's the first join event for a given user.
+ missing_members.add(member)
+ auth_event_ids.update(event_with_membership_auth.auth_event_ids())
+
+ auth_events = await self.store.get_events(auth_event_ids)
+
+ # Run through the missing memberships once more, picking out the memberships
+ # from the pile of auth events we have just fetched.
+ for member in members_to_fetch:
+ if (EventTypes.Member, member) in found_state_ids:
+ continue
+
+ event_with_membership_auth = events_with_membership_auth[member]
+
+ # Dig through the auth events to find the desired membership.
+ for auth_event_id in event_with_membership_auth.auth_event_ids():
+ # We only store events once we have all their auth events,
+ # so the auth event must be in the pile we have just
+ # fetched.
+ auth_event = auth_events[auth_event_id]
+
+ if (
+ auth_event.type == EventTypes.Member
+ and auth_event.state_key == member
+ ):
+ missing_members.discard(member)
+ additional_state_ids[
+ (EventTypes.Member, member)
+ ] = auth_event.event_id
+ break
+
+ if missing_members:
+ # There really shouldn't be any missing memberships now. Either:
+ # * we couldn't find an auth event, which shouldn't happen because we do
+ # not persist events with persisting their auth events first, or
+ # * the set of auth events did not contain a membership we wanted, which
+ # means our caller didn't compute the events in `members_to_fetch`
+ # correctly, or we somehow accepted an event whose auth events were
+ # dodgy.
+ logger.error(
+ "Failed to find memberships for %s in partial state room "
+ "%s in the auth events of %s.",
+ missing_members,
+ room_id,
+ [
+ events_with_membership_auth[member].event_id
+ for member in missing_members
+ ],
+ )
+
+ return additional_state_ids
+
async def unread_notifs_for_room_id(
self, room_id: str, sync_config: SyncConfig
- ) -> NotifCounts:
+ ) -> RoomNotifCounts:
with Measure(self.clock, "unread_notifs_for_room_id"):
return await self.store.get_unread_event_push_actions_by_room_for_user(
@@ -1079,6 +1301,19 @@ class SyncHandler:
At the end, we transfer data from the `sync_result_builder` to a new `SyncResult`
instance to signify that the sync calculation is complete.
"""
+
+ user_id = sync_config.user.to_string()
+ app_service = self.store.get_app_service_by_user_id(user_id)
+ if app_service:
+ # We no longer support AS users using /sync directly.
+ # See https://github.com/matrix-org/matrix-doc/issues/1144
+ raise NotImplementedError()
+
+ # Note: we get the users room list *before* we get the current token, this
+ # avoids checking back in history if rooms are joined after the token is fetched.
+ token_before_rooms = self.event_sources.get_current_token()
+ mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id))
+
# NB: The now_token gets changed by some of the generate_sync_* methods,
# this is due to some of the underlying streams not supporting the ability
# to query up to a given point.
@@ -1086,6 +1321,57 @@ class SyncHandler:
now_token = self.event_sources.get_current_token()
log_kv({"now_token": now_token})
+ # Since we fetched the users room list before the token, there's a small window
+ # during which membership events may have been persisted, so we fetch these now
+ # and modify the joined room list for any changes between the get_rooms_for_user
+ # call and the get_current_token call.
+ membership_change_events = []
+ if since_token:
+ membership_change_events = await self.store.get_membership_changes_for_user(
+ user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude
+ )
+
+ mem_last_change_by_room_id: Dict[str, EventBase] = {}
+ for event in membership_change_events:
+ mem_last_change_by_room_id[event.room_id] = event
+
+ # For the latest membership event in each room found, add/remove the room ID
+ # from the joined room list accordingly. In this case we only care if the
+ # latest change is JOIN.
+
+ for room_id, event in mem_last_change_by_room_id.items():
+ assert event.internal_metadata.stream_ordering
+ if (
+ event.internal_metadata.stream_ordering
+ < token_before_rooms.room_key.stream
+ ):
+ continue
+
+ logger.info(
+ "User membership change between getting rooms and current token: %s %s %s",
+ user_id,
+ event.membership,
+ room_id,
+ )
+ # User joined a room - we have to then check the room state to ensure we
+ # respect any bans if there's a race between the join and ban events.
+ if event.membership == Membership.JOIN:
+ user_ids_in_room = await self.store.get_users_in_room(room_id)
+ if user_id in user_ids_in_room:
+ mutable_joined_room_ids.add(room_id)
+ # The user left the room, or left and was re-invited but not joined yet
+ else:
+ mutable_joined_room_ids.discard(room_id)
+
+ # Now we have our list of joined room IDs, exclude as configured and freeze
+ joined_room_ids = frozenset(
+ (
+ room_id
+ for room_id in mutable_joined_room_ids
+ if room_id not in self.rooms_to_exclude
+ )
+ )
+
logger.debug(
"Calculating sync response for %r between %s and %s",
sync_config.user,
@@ -1093,22 +1379,13 @@ class SyncHandler:
now_token,
)
- user_id = sync_config.user.to_string()
- app_service = self.store.get_app_service_by_user_id(user_id)
- if app_service:
- # We no longer support AS users using /sync directly.
- # See https://github.com/matrix-org/matrix-doc/issues/1144
- raise NotImplementedError()
- else:
- joined_room_ids = await self.get_rooms_for_user_at(
- user_id, now_token.room_key
- )
sync_result_builder = SyncResultBuilder(
sync_config,
full_state,
since_token=since_token,
now_token=now_token,
joined_room_ids=joined_room_ids,
+ membership_change_events=membership_change_events,
)
logger.debug("Fetching account data")
@@ -1195,10 +1472,10 @@ class SyncHandler:
async def _generate_sync_entry_for_device_list(
self,
sync_result_builder: "SyncResultBuilder",
- newly_joined_rooms: Set[str],
- newly_joined_or_invited_or_knocked_users: Set[str],
- newly_left_rooms: Set[str],
- newly_left_users: Set[str],
+ newly_joined_rooms: AbstractSet[str],
+ newly_joined_or_invited_or_knocked_users: AbstractSet[str],
+ newly_left_rooms: AbstractSet[str],
+ newly_left_users: AbstractSet[str],
) -> DeviceListUpdates:
"""Generate the DeviceListUpdates section of sync
@@ -1216,8 +1493,7 @@ class SyncHandler:
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
- # We're going to mutate these fields, so lets copy them rather than
- # assume they won't get used later.
+ # Take a copy since these fields will be mutated later.
newly_joined_or_invited_or_knocked_users = set(
newly_joined_or_invited_or_knocked_users
)
@@ -1256,16 +1532,14 @@ class SyncHandler:
since_token.device_list_key
)
if changed_users is not None:
- result = await self.store.get_rooms_for_users_with_stream_ordering(
- changed_users
- )
+ result = await self.store.get_rooms_for_users(changed_users)
for changed_user_id, entries in result.items():
# Check if the changed user shares any rooms with the user,
# or if the changed user is the syncing user (as we always
# want to include device list updates of their own devices).
if user_id == changed_user_id or any(
- e.room_id in joined_rooms for e in entries
+ rid in joined_rooms for rid in entries
):
users_that_have_changed.add(changed_user_id)
else:
@@ -1299,13 +1573,9 @@ class SyncHandler:
newly_left_users.update(left_users)
# Remove any users that we still share a room with.
- left_users_rooms = (
- await self.store.get_rooms_for_users_with_stream_ordering(
- newly_left_users
- )
- )
+ left_users_rooms = await self.store.get_rooms_for_users(newly_left_users)
for user_id, entries in left_users_rooms.items():
- if any(e.room_id in joined_rooms for e in entries):
+ if any(rid in joined_rooms for rid in entries):
newly_left_users.discard(user_id)
return DeviceListUpdates(
@@ -1417,8 +1687,8 @@ class SyncHandler:
async def _generate_sync_entry_for_presence(
self,
sync_result_builder: "SyncResultBuilder",
- newly_joined_rooms: Set[str],
- newly_joined_or_invited_users: Set[str],
+ newly_joined_rooms: AbstractSet[str],
+ newly_joined_or_invited_users: AbstractSet[str],
) -> None:
"""Generates the presence portion of the sync response. Populates the
`sync_result_builder` with the result.
@@ -1476,7 +1746,7 @@ class SyncHandler:
self,
sync_result_builder: "SyncResultBuilder",
account_data_by_room: Dict[str, Dict[str, JsonDict]],
- ) -> Tuple[Set[str], Set[str], Set[str], Set[str]]:
+ ) -> Tuple[AbstractSet[str], AbstractSet[str], AbstractSet[str], AbstractSet[str]]:
"""Generates the rooms portion of the sync response. Populates the
`sync_result_builder` with the result.
@@ -1536,15 +1806,13 @@ class SyncHandler:
ignored_users = await self.store.ignored_users(user_id)
if since_token:
room_changes = await self._get_rooms_changed(
- sync_result_builder, ignored_users, self.rooms_to_exclude
+ sync_result_builder, ignored_users
)
tags_by_room = await self.store.get_updated_tags(
user_id, since_token.account_data_key
)
else:
- room_changes = await self._get_all_rooms(
- sync_result_builder, ignored_users, self.rooms_to_exclude
- )
+ room_changes = await self._get_all_rooms(sync_result_builder, ignored_users)
tags_by_room = await self.store.get_tags_for_user(user_id)
log_kv({"rooms_changed": len(room_changes.room_entries)})
@@ -1598,19 +1866,12 @@ class SyncHandler:
Does not modify the `sync_result_builder`.
"""
- user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
- now_token = sync_result_builder.now_token
+ membership_change_events = sync_result_builder.membership_change_events
assert since_token
- # Get a list of membership change events that have happened to the user
- # requesting the sync.
- membership_changes = await self.store.get_membership_changes_for_user(
- user_id, since_token.room_key, now_token.room_key
- )
-
- if membership_changes:
+ if membership_change_events:
return True
stream_id = since_token.room_key.stream
@@ -1623,13 +1884,14 @@ class SyncHandler:
self,
sync_result_builder: "SyncResultBuilder",
ignored_users: FrozenSet[str],
- excluded_rooms: List[str],
) -> _RoomChanges:
"""Determine the changes in rooms to report to the user.
This function is a first pass at generating the rooms part of the sync response.
It determines which rooms have changed during the sync period, and categorises
- them into four buckets: "knock", "invite", "join" and "leave".
+ them into four buckets: "knock", "invite", "join" and "leave". It also excludes
+ from that list any room that appears in the list of rooms to exclude from sync
+ results in the server configuration.
1. Finds all membership changes for the user in the sync period (from
`since_token` up to `now_token`).
@@ -1648,16 +1910,10 @@ class SyncHandler:
since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token
sync_config = sync_result_builder.sync_config
+ membership_change_events = sync_result_builder.membership_change_events
assert since_token
- # TODO: we've already called this function and ran this query in
- # _have_rooms_changed. We could keep the results in memory to avoid a
- # second query, at the cost of more complicated source code.
- membership_change_events = await self.store.get_membership_changes_for_user(
- user_id, since_token.room_key, now_token.room_key, excluded_rooms
- )
-
mem_change_events_by_room_id: Dict[str, List[EventBase]] = {}
for event in membership_change_events:
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
@@ -1696,7 +1952,11 @@ class SyncHandler:
continue
if room_id in sync_result_builder.joined_room_ids or has_join:
- old_state_ids = await self.get_state_at(room_id, since_token)
+ old_state_ids = await self.get_state_at(
+ room_id,
+ since_token,
+ state_filter=StateFilter.from_types([(EventTypes.Member, user_id)]),
+ )
old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
old_mem_ev = None
if old_mem_ev_id:
@@ -1722,7 +1982,13 @@ class SyncHandler:
newly_left_rooms.append(room_id)
else:
if not old_state_ids:
- old_state_ids = await self.get_state_at(room_id, since_token)
+ old_state_ids = await self.get_state_at(
+ room_id,
+ since_token,
+ state_filter=StateFilter.from_types(
+ [(EventTypes.Member, user_id)]
+ ),
+ )
old_mem_ev_id = old_state_ids.get(
(EventTypes.Member, user_id), None
)
@@ -1862,7 +2128,6 @@ class SyncHandler:
self,
sync_result_builder: "SyncResultBuilder",
ignored_users: FrozenSet[str],
- ignored_rooms: List[str],
) -> _RoomChanges:
"""Returns entries for all rooms for the user.
@@ -1884,7 +2149,7 @@ class SyncHandler:
room_list = await self.store.get_rooms_for_local_user_where_membership_is(
user_id=user_id,
membership_list=Membership.LIST,
- excluded_rooms=ignored_rooms,
+ excluded_rooms=self.rooms_to_exclude,
)
room_entries = []
@@ -2117,6 +2382,7 @@ class SyncHandler:
ephemeral=ephemeral,
account_data=account_data_events,
unread_notifications=unread_notifications,
+ unread_thread_notifications={},
summary=summary,
unread_count=0,
)
@@ -2124,10 +2390,33 @@ class SyncHandler:
if room_sync or always_include:
notifs = await self.unread_notifs_for_room_id(room_id, sync_config)
- unread_notifications["notification_count"] = notifs.notify_count
- unread_notifications["highlight_count"] = notifs.highlight_count
-
- room_sync.unread_count = notifs.unread_count
+ # Notifications for the main timeline.
+ notify_count = notifs.main_timeline.notify_count
+ highlight_count = notifs.main_timeline.highlight_count
+ unread_count = notifs.main_timeline.unread_count
+
+ # Check the sync configuration.
+ if sync_config.filter_collection.unread_thread_notifications():
+ # And add info for each thread.
+ room_sync.unread_thread_notifications = {
+ thread_id: {
+ "notification_count": thread_notifs.notify_count,
+ "highlight_count": thread_notifs.highlight_count,
+ }
+ for thread_id, thread_notifs in notifs.threads.items()
+ if thread_id is not None
+ }
+
+ else:
+ # Combine the unread counts for all threads and main timeline.
+ for thread_notifs in notifs.threads.values():
+ notify_count += thread_notifs.notify_count
+ highlight_count += thread_notifs.highlight_count
+ unread_count += thread_notifs.unread_count
+
+ unread_notifications["notification_count"] = notify_count
+ unread_notifications["highlight_count"] = highlight_count
+ room_sync.unread_count = unread_count
sync_result_builder.joined.append(room_sync)
@@ -2149,53 +2438,6 @@ class SyncHandler:
else:
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
- async def get_rooms_for_user_at(
- self, user_id: str, room_key: RoomStreamToken
- ) -> FrozenSet[str]:
- """Get set of joined rooms for a user at the given stream ordering.
-
- The stream ordering *must* be recent, otherwise this may throw an
- exception if older than a month. (This function is called with the
- current token, which should be perfectly fine).
-
- Args:
- user_id
- stream_ordering
-
- ReturnValue:
- Set of room_ids the user is in at given stream_ordering.
- """
- joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering(user_id)
-
- joined_room_ids = set()
-
- # We need to check that the stream ordering of the join for each room
- # is before the stream_ordering asked for. This might not be the case
- # if the user joins a room between us getting the current token and
- # calling `get_rooms_for_user_with_stream_ordering`.
- # If the membership's stream ordering is after the given stream
- # ordering, we need to go and work out if the user was in the room
- # before.
- for joined_room in joined_rooms:
- if not joined_room.event_pos.persisted_after(room_key):
- joined_room_ids.add(joined_room.room_id)
- continue
-
- logger.info("User joined room after current token: %s", joined_room.room_id)
-
- extrems = (
- await self.store.get_forward_extremities_for_room_at_stream_ordering(
- joined_room.room_id, joined_room.event_pos.stream
- )
- )
- users_in_room = await self.state.get_current_users_in_room(
- joined_room.room_id, extrems
- )
- if user_id in users_in_room:
- joined_room_ids.add(joined_room.room_id)
-
- return frozenset(joined_room_ids)
-
def _action_has_highlight(actions: List[JsonDict]) -> bool:
for action in actions:
@@ -2211,8 +2453,8 @@ def _action_has_highlight(actions: List[JsonDict]) -> bool:
def _calculate_state(
timeline_contains: StateMap[str],
timeline_start: StateMap[str],
- previous: StateMap[str],
- current: StateMap[str],
+ timeline_end: StateMap[str],
+ previous_timeline_end: StateMap[str],
lazy_load_members: bool,
) -> StateMap[str]:
"""Works out what state to include in a sync response.
@@ -2220,45 +2462,50 @@ def _calculate_state(
Args:
timeline_contains: state in the timeline
timeline_start: state at the start of the timeline
- previous: state at the end of the previous sync (or empty dict
+ timeline_end: state at the end of the timeline
+ previous_timeline_end: state at the end of the previous sync (or empty dict
if this is an initial sync)
- current: state at the end of the timeline
lazy_load_members: whether to return members from timeline_start
or not. assumes that timeline_start has already been filtered to
include only the members the client needs to know about.
"""
- event_id_to_key = {
- e: key
- for key, e in itertools.chain(
+ event_id_to_state_key = {
+ event_id: state_key
+ for state_key, event_id in itertools.chain(
timeline_contains.items(),
- previous.items(),
timeline_start.items(),
- current.items(),
+ timeline_end.items(),
+ previous_timeline_end.items(),
)
}
- c_ids = set(current.values())
- ts_ids = set(timeline_start.values())
- p_ids = set(previous.values())
- tc_ids = set(timeline_contains.values())
+ timeline_end_ids = set(timeline_end.values())
+ timeline_start_ids = set(timeline_start.values())
+ previous_timeline_end_ids = set(previous_timeline_end.values())
+ timeline_contains_ids = set(timeline_contains.values())
# If we are lazyloading room members, we explicitly add the membership events
# for the senders in the timeline into the state block returned by /sync,
# as we may not have sent them to the client before. We find these membership
# events by filtering them out of timeline_start, which has already been filtered
# to only include membership events for the senders in the timeline.
- # In practice, we can do this by removing them from the p_ids list,
- # which is the list of relevant state we know we have already sent to the client.
+ # In practice, we can do this by removing them from the previous_timeline_end_ids
+ # list, which is the list of relevant state we know we have already sent to the
+ # client.
# see https://github.com/matrix-org/synapse/pull/2970/files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809
if lazy_load_members:
- p_ids.difference_update(
+ previous_timeline_end_ids.difference_update(
e for t, e in timeline_start.items() if t[0] == EventTypes.Member
)
- state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids
+ state_ids = (
+ (timeline_end_ids | timeline_start_ids)
+ - previous_timeline_end_ids
+ - timeline_contains_ids
+ )
- return {event_id_to_key[e]: e for e in state_ids}
+ return {event_id_to_state_key[e]: e for e in state_ids}
@attr.s(slots=True, auto_attribs=True)
@@ -2287,6 +2534,7 @@ class SyncResultBuilder:
since_token: Optional[StreamToken]
now_token: StreamToken
joined_room_ids: FrozenSet[str]
+ membership_change_events: List[EventBase]
presence: List[UserPresenceState] = attr.Factory(list)
account_data: List[JsonDict] = attr.Factory(list)
@@ -2296,7 +2544,7 @@ class SyncResultBuilder:
archived: List[ArchivedSyncResult] = attr.Factory(list)
to_device: List[JsonDict] = attr.Factory(list)
- def calculate_user_changes(self) -> Tuple[Set[str], Set[str]]:
+ def calculate_user_changes(self) -> Tuple[AbstractSet[str], AbstractSet[str]]:
"""Work out which other users have joined or left rooms we are joined to.
This data only is only useful for an incremental sync.
|