diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c8858b22dd..f1f19666d7 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -17,7 +17,6 @@ from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
- Collection,
Dict,
FrozenSet,
List,
@@ -31,19 +30,30 @@ from typing import (
import attr
from prometheus_client import Counter
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import (
+ AccountDataTypes,
+ EventContentFields,
+ EventTypes,
+ Membership,
+)
from synapse.api.filtering import FilterCollection
from synapse.api.presence import UserPresenceState
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
+from synapse.handlers.device import DELETE_DEVICE_MSGS_TASK_NAME
from synapse.handlers.relations import BundledAggregations
+from synapse.logging import issue9533_logger
from synapse.logging.context import current_context
-from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
-from synapse.push.clientformat import format_push_rules_for_user
+from synapse.logging.opentracing import (
+ SynapseTags,
+ log_kv,
+ set_tag,
+ start_active_span,
+ trace,
+)
from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
from synapse.storage.roommember import MemberSummary
-from synapse.storage.state import StateFilter
from synapse.types import (
DeviceListUpdates,
JsonDict,
@@ -51,10 +61,12 @@ from synapse.types import (
Requester,
RoomStreamToken,
StateMap,
+ StrCollection,
StreamKeyType,
StreamToken,
UserID,
)
+from synapse.types.state import StateFilter
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.lrucache import LruCache
@@ -249,6 +261,7 @@ class SyncHandler:
self.notifier = hs.get_notifier()
self.presence_handler = hs.get_presence_handler()
self._relations_handler = hs.get_relations_handler()
+ self._push_rules_handler = hs.get_push_rules_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
self.state = hs.get_state_handler()
@@ -256,6 +269,9 @@ class SyncHandler:
self._storage_controllers = hs.get_storage_controllers()
self._state_storage_controller = self._storage_controllers.state
self._device_handler = hs.get_device_handler()
+ self._task_scheduler = hs.get_task_scheduler()
+
+ self.should_calculate_push_rules = hs.config.push.enable_push
# TODO: flush cache entries on subsequent sync request.
# Once we get the next /sync request (ie, one with the same access token
@@ -278,7 +294,7 @@ class SyncHandler:
expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
)
- self.rooms_to_exclude = hs.config.server.rooms_to_exclude_from_sync
+ self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
async def wait_for_sync_for_user(
self,
@@ -346,13 +362,36 @@ class SyncHandler:
# (since we now know that the device has received them)
if since_token is not None:
since_stream_id = since_token.to_device_key
+ # Fast path: delete a limited number of to-device messages up front.
+ # We do this to avoid the overhead of scheduling a task for every
+ # sync.
+ device_deletion_limit = 100
deleted = await self.store.delete_messages_for_device(
- sync_config.user.to_string(), sync_config.device_id, since_stream_id
+ sync_config.user.to_string(),
+ sync_config.device_id,
+ since_stream_id,
+ limit=device_deletion_limit,
)
logger.debug(
"Deleted %d to-device messages up to %d", deleted, since_stream_id
)
+ # If we hit the limit, schedule a background task to delete the rest.
+ if deleted >= device_deletion_limit:
+ await self._task_scheduler.schedule_task(
+ DELETE_DEVICE_MSGS_TASK_NAME,
+ resource_id=sync_config.device_id,
+ params={
+ "user_id": sync_config.user.to_string(),
+ "device_id": sync_config.device_id,
+ "up_to_stream_id": since_stream_id,
+ },
+ )
+ logger.debug(
+ "Deletion of to-device messages up to %d scheduled",
+ since_stream_id,
+ )
+
if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
@@ -373,16 +412,16 @@ class SyncHandler:
from_token=since_token,
)
- # if nothing has happened in any of the users' rooms since /sync was called,
- # the resultant next_batch will be the same as since_token (since the result
- # is generated when wait_for_events is first called, and not regenerated
- # when wait_for_events times out).
- #
- # If that happens, we mustn't cache it, so that when the client comes back
- # with the same cache token, we don't immediately return the same empty
- # result, causing a tightloop. (#8518)
- if result.next_batch == since_token:
- cache_context.should_cache = False
+ # if nothing has happened in any of the users' rooms since /sync was called,
+ # the resultant next_batch will be the same as since_token (since the result
+ # is generated when wait_for_events is first called, and not regenerated
+ # when wait_for_events times out).
+ #
+ # If that happens, we mustn't cache it, so that when the client comes back
+ # with the same cache token, we don't immediately return the same empty
+ # result, causing a tightloop. (#8518)
+ if result.next_batch == since_token:
+ cache_context.should_cache = False
if result:
if sync_config.filter_collection.lazy_load_members():
@@ -414,12 +453,6 @@ class SyncHandler:
set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
return sync_result
- async def push_rules_for_user(self, user: UserID) -> Dict[str, Dict[str, list]]:
- user_id = user.to_string()
- rules_raw = await self.store.get_push_rules_for_user(user_id)
- rules = format_push_rules_for_user(user, rules_raw)
- return rules
-
async def ephemeral_by_room(
self,
sync_result_builder: "SyncResultBuilder",
@@ -929,6 +962,8 @@ class SyncHandler:
timeline_state = {}
+ # Membership events to fetch that can be found in the room state, or in
+ # the case of partial state rooms, the auth events of timeline events.
members_to_fetch = set()
first_event_by_sender_map = {}
for event in batch.events:
@@ -950,9 +985,19 @@ class SyncHandler:
# (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
# We only need apply this on full state syncs given we disabled
# LL for incr syncs in #3840.
- members_to_fetch.add(sync_config.user.to_string())
-
- state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
+ # We don't insert ourselves into `members_to_fetch`, because in some
+ # rare cases (an empty event batch with a now_token after the user's
+ # leave in a partial state room which another local user has
+ # joined), the room state will be missing our membership and there
+ # is no guarantee that our membership will be in the auth events of
+ # timeline events when the room is partial stated.
+ state_filter = StateFilter.from_lazy_load_member_list(
+ members_to_fetch.union((sync_config.user.to_string(),))
+ )
+ else:
+ state_filter = StateFilter.from_lazy_load_member_list(
+ members_to_fetch
+ )
# We are happy to use partial state to compute the `/sync` response.
# Since partial state may not include the lazy-loaded memberships we
@@ -974,7 +1019,9 @@ class SyncHandler:
# sync's timeline and the start of the current sync's timeline.
# See the docstring above for details.
state_ids: StateMap[str]
-
+ # We need to know whether the state we fetch may be partial, so check
+ # whether the room is partial stated *before* fetching it.
+ is_partial_state_room = await self.store.is_partial_state_room(room_id)
if full_state:
if batch:
state_at_timeline_end = (
@@ -1105,7 +1152,7 @@ class SyncHandler:
# If we only have partial state for the room, `state_ids` may be missing the
# memberships we wanted. We attempt to find some by digging through the auth
# events of timeline events.
- if lazy_load_members and await self.store.is_partial_state_room(room_id):
+ if lazy_load_members and is_partial_state_room:
assert members_to_fetch is not None
assert first_event_by_sender_map is not None
@@ -1167,7 +1214,7 @@ class SyncHandler:
async def _find_missing_partial_state_memberships(
self,
room_id: str,
- members_to_fetch: Collection[str],
+ members_to_fetch: StrCollection,
events_with_membership_auth: Mapping[str, EventBase],
found_state_ids: StateMap[str],
) -> StateMap[str]:
@@ -1212,6 +1259,10 @@ class SyncHandler:
continue
event_with_membership_auth = events_with_membership_auth[member]
+ is_create = (
+ event_with_membership_auth.is_state()
+ and event_with_membership_auth.type == EventTypes.Create
+ )
is_join = (
event_with_membership_auth.is_state()
and event_with_membership_auth.type == EventTypes.Member
@@ -1219,9 +1270,10 @@ class SyncHandler:
and event_with_membership_auth.content.get("membership")
== Membership.JOIN
)
- if not is_join:
+ if not is_create and not is_join:
# The event must include the desired membership as an auth event, unless
- # it's the first join event for a given user.
+ # it's the `m.room.create` event for a room or the first join event for
+ # a given user.
missing_members.add(member)
auth_event_ids.update(event_with_membership_auth.auth_event_ids())
@@ -1276,8 +1328,13 @@ class SyncHandler:
async def unread_notifs_for_room_id(
self, room_id: str, sync_config: SyncConfig
) -> RoomNotifCounts:
- with Measure(self.clock, "unread_notifs_for_room_id"):
+ if not self.should_calculate_push_rules:
+ # If push rules have been universally disabled then we know we won't
+ # have any unread counts in the DB, so we may as well skip asking
+ # the DB.
+ return RoomNotifCounts.empty()
+ with Measure(self.clock, "unread_notifs_for_room_id"):
return await self.store.get_unread_event_push_actions_by_room_for_user(
room_id,
sync_config.user.to_string(),
@@ -1328,7 +1385,10 @@ class SyncHandler:
membership_change_events = []
if since_token:
membership_change_events = await self.store.get_membership_changes_for_user(
- user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude
+ user_id,
+ since_token.room_key,
+ now_token.room_key,
+ self.rooms_to_exclude_globally,
)
mem_last_change_by_room_id: Dict[str, EventBase] = {}
@@ -1363,13 +1423,53 @@ class SyncHandler:
else:
mutable_joined_room_ids.discard(room_id)
- # Now we have our list of joined room IDs, exclude as configured and freeze
- joined_room_ids = frozenset(
- (
+ # Tweak the set of rooms to return to the client for eager (non-lazy) syncs.
+ mutable_rooms_to_exclude = set(self.rooms_to_exclude_globally)
+ if not sync_config.filter_collection.lazy_load_members():
+ # Non-lazy syncs should never include partially stated rooms.
+ # Exclude all partially stated rooms from this sync.
+ results = await self.store.is_partial_state_room_batched(
+ mutable_joined_room_ids
+ )
+ mutable_rooms_to_exclude.update(
+ room_id
+ for room_id, is_partial_state in results.items()
+ if is_partial_state
+ )
+ membership_change_events = [
+ event
+ for event in membership_change_events
+ if not results.get(event.room_id, False)
+ ]
+
+ # Incremental eager syncs should additionally include rooms that
+ # - we are joined to
+ # - are full-stated
+ # - became fully-stated at some point during the sync period
+ # (These rooms will have been omitted during a previous eager sync.)
+ forced_newly_joined_room_ids: Set[str] = set()
+ if since_token and not sync_config.filter_collection.lazy_load_members():
+ un_partial_stated_rooms = (
+ await self.store.get_un_partial_stated_rooms_between(
+ since_token.un_partial_stated_rooms_key,
+ now_token.un_partial_stated_rooms_key,
+ mutable_joined_room_ids,
+ )
+ )
+ results = await self.store.is_partial_state_room_batched(
+ un_partial_stated_rooms
+ )
+ forced_newly_joined_room_ids.update(
room_id
- for room_id in mutable_joined_room_ids
- if room_id not in self.rooms_to_exclude
+ for room_id, is_partial_state in results.items()
+ if not is_partial_state
)
+
+ # Now we have our list of joined room IDs, exclude as configured and freeze
+ joined_room_ids = frozenset(
+ room_id
+ for room_id in mutable_joined_room_ids
+ if room_id not in mutable_rooms_to_exclude
)
logger.debug(
@@ -1385,45 +1485,76 @@ class SyncHandler:
since_token=since_token,
now_token=now_token,
joined_room_ids=joined_room_ids,
+ excluded_room_ids=frozenset(mutable_rooms_to_exclude),
+ forced_newly_joined_room_ids=frozenset(forced_newly_joined_room_ids),
membership_change_events=membership_change_events,
)
logger.debug("Fetching account data")
- account_data_by_room = await self._generate_sync_entry_for_account_data(
- sync_result_builder
- )
+ # Global account data is included if it is not filtered out.
+ if not sync_config.filter_collection.blocks_all_global_account_data():
+ await self._generate_sync_entry_for_account_data(sync_result_builder)
- logger.debug("Fetching room data")
-
- res = await self._generate_sync_entry_for_rooms(
- sync_result_builder, account_data_by_room
+ # Presence data is included if the server has it enabled and not filtered out.
+ include_presence_data = bool(
+ self.hs_config.server.use_presence
+ and not sync_config.filter_collection.blocks_all_presence()
)
- newly_joined_rooms, newly_joined_or_invited_or_knocked_users, _, _ = res
- _, _, newly_left_rooms, newly_left_users = res
+ # Device list updates are sent if a since token is provided.
+ include_device_list_updates = bool(since_token and since_token.device_list_key)
+
+ # If we do not care about the rooms or things which depend on the room
+ # data (namely presence and device list updates), then we can skip
+ # this process completely.
+ device_lists = DeviceListUpdates()
+ if (
+ not sync_result_builder.sync_config.filter_collection.blocks_all_rooms()
+ or include_presence_data
+ or include_device_list_updates
+ ):
+ logger.debug("Fetching room data")
- block_all_presence_data = (
- since_token is None and sync_config.filter_collection.blocks_all_presence()
- )
- if self.hs_config.server.use_presence and not block_all_presence_data:
- logger.debug("Fetching presence data")
- await self._generate_sync_entry_for_presence(
- sync_result_builder,
+ # Note that _generate_sync_entry_for_rooms sets sync_result_builder.joined, which
+ # is used in calculate_user_changes below.
+ (
newly_joined_rooms,
- newly_joined_or_invited_or_knocked_users,
- )
+ newly_left_rooms,
+ ) = await self._generate_sync_entry_for_rooms(sync_result_builder)
+
+ # Work out which users have joined or left rooms we're in. We use this
+ # to build the presence and device_list parts of the sync response in
+ # `_generate_sync_entry_for_presence` and
+ # `_generate_sync_entry_for_device_list` respectively.
+ if include_presence_data or include_device_list_updates:
+ # This uses the sync_result_builder.joined which is set in
+ # `_generate_sync_entry_for_rooms`, if that didn't find any joined
+ # rooms for some reason it is a no-op.
+ (
+ newly_joined_or_invited_or_knocked_users,
+ newly_left_users,
+ ) = sync_result_builder.calculate_user_changes()
+
+ if include_presence_data:
+ logger.debug("Fetching presence data")
+ await self._generate_sync_entry_for_presence(
+ sync_result_builder,
+ newly_joined_rooms,
+ newly_joined_or_invited_or_knocked_users,
+ )
+
+ if include_device_list_updates:
+ device_lists = await self._generate_sync_entry_for_device_list(
+ sync_result_builder,
+ newly_joined_rooms=newly_joined_rooms,
+ newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
+ newly_left_rooms=newly_left_rooms,
+ newly_left_users=newly_left_users,
+ )
logger.debug("Fetching to-device data")
await self._generate_sync_entry_for_to_device(sync_result_builder)
- device_lists = await self._generate_sync_entry_for_device_list(
- sync_result_builder,
- newly_joined_rooms=newly_joined_rooms,
- newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
- newly_left_rooms=newly_left_rooms,
- newly_left_users=newly_left_users,
- )
-
logger.debug("Fetching OTK data")
device_id = sync_config.device_id
one_time_keys_count: JsonDict = {}
@@ -1436,7 +1567,7 @@ class SyncHandler:
one_time_keys_count = await self.store.count_e2e_one_time_keys(
user_id, device_id
)
- unused_fallback_key_types = (
+ unused_fallback_key_types = list(
await self.store.get_e2e_unused_fallback_key_types(user_id, device_id)
)
@@ -1492,6 +1623,7 @@ class SyncHandler:
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
+ assert since_token is not None
# Take a copy since these fields will be mutated later.
newly_joined_or_invited_or_knocked_users = set(
@@ -1499,91 +1631,87 @@ class SyncHandler:
)
newly_left_users = set(newly_left_users)
- if since_token and since_token.device_list_key:
- # We want to figure out what user IDs the client should refetch
- # device keys for, and which users we aren't going to track changes
- # for anymore.
- #
- # For the first step we check:
- # a. if any users we share a room with have updated their devices,
- # and
- # b. we also check if we've joined any new rooms, or if a user has
- # joined a room we're in.
- #
- # For the second step we just find any users we no longer share a
- # room with by looking at all users that have left a room plus users
- # that were in a room we've left.
-
- users_that_have_changed = set()
-
- joined_rooms = sync_result_builder.joined_room_ids
-
- # Step 1a, check for changes in devices of users we share a room
- # with
- #
- # We do this in two different ways depending on what we have cached.
- # If we already have a list of all the user that have changed since
- # the last sync then it's likely more efficient to compare the rooms
- # they're in with the rooms the syncing user is in.
- #
- # If we don't have that info cached then we get all the users that
- # share a room with our user and check if those users have changed.
- changed_users = self.store.get_cached_device_list_changes(
- since_token.device_list_key
- )
- if changed_users is not None:
- result = await self.store.get_rooms_for_users(changed_users)
-
- for changed_user_id, entries in result.items():
- # Check if the changed user shares any rooms with the user,
- # or if the changed user is the syncing user (as we always
- # want to include device list updates of their own devices).
- if user_id == changed_user_id or any(
- rid in joined_rooms for rid in entries
- ):
- users_that_have_changed.add(changed_user_id)
- else:
- users_that_have_changed = (
- await self._device_handler.get_device_changes_in_shared_rooms(
- user_id,
- sync_result_builder.joined_room_ids,
- from_token=since_token,
- )
- )
+ # We want to figure out what user IDs the client should refetch
+ # device keys for, and which users we aren't going to track changes
+ # for anymore.
+ #
+ # For the first step we check:
+ # a. if any users we share a room with have updated their devices,
+ # and
+ # b. we also check if we've joined any new rooms, or if a user has
+ # joined a room we're in.
+ #
+ # For the second step we just find any users we no longer share a
+ # room with by looking at all users that have left a room plus users
+ # that were in a room we've left.
- # Step 1b, check for newly joined rooms
- for room_id in newly_joined_rooms:
- joined_users = await self.store.get_users_in_room(room_id)
- newly_joined_or_invited_or_knocked_users.update(joined_users)
+ users_that_have_changed = set()
- # TODO: Check that these users are actually new, i.e. either they
- # weren't in the previous sync *or* they left and rejoined.
- users_that_have_changed.update(newly_joined_or_invited_or_knocked_users)
+ joined_rooms = sync_result_builder.joined_room_ids
- user_signatures_changed = (
- await self.store.get_users_whose_signatures_changed(
- user_id, since_token.device_list_key
+ # Step 1a, check for changes in devices of users we share a room
+ # with
+ #
+ # We do this in two different ways depending on what we have cached.
+ # If we already have a list of all the user that have changed since
+ # the last sync then it's likely more efficient to compare the rooms
+ # they're in with the rooms the syncing user is in.
+ #
+ # If we don't have that info cached then we get all the users that
+ # share a room with our user and check if those users have changed.
+ cache_result = self.store.get_cached_device_list_changes(
+ since_token.device_list_key
+ )
+ if cache_result.hit:
+ changed_users = cache_result.entities
+
+ result = await self.store.get_rooms_for_users(changed_users)
+
+ for changed_user_id, entries in result.items():
+ # Check if the changed user shares any rooms with the user,
+ # or if the changed user is the syncing user (as we always
+ # want to include device list updates of their own devices).
+ if user_id == changed_user_id or any(
+ rid in joined_rooms for rid in entries
+ ):
+ users_that_have_changed.add(changed_user_id)
+ else:
+ users_that_have_changed = (
+ await self._device_handler.get_device_changes_in_shared_rooms(
+ user_id,
+ sync_result_builder.joined_room_ids,
+ from_token=since_token,
)
)
- users_that_have_changed.update(user_signatures_changed)
- # Now find users that we no longer track
- for room_id in newly_left_rooms:
- left_users = await self.store.get_users_in_room(room_id)
- newly_left_users.update(left_users)
+ # Step 1b, check for newly joined rooms
+ for room_id in newly_joined_rooms:
+ joined_users = await self.store.get_users_in_room(room_id)
+ newly_joined_or_invited_or_knocked_users.update(joined_users)
+
+ # TODO: Check that these users are actually new, i.e. either they
+ # weren't in the previous sync *or* they left and rejoined.
+ users_that_have_changed.update(newly_joined_or_invited_or_knocked_users)
- # Remove any users that we still share a room with.
- left_users_rooms = await self.store.get_rooms_for_users(newly_left_users)
- for user_id, entries in left_users_rooms.items():
- if any(rid in joined_rooms for rid in entries):
- newly_left_users.discard(user_id)
+ user_signatures_changed = await self.store.get_users_whose_signatures_changed(
+ user_id, since_token.device_list_key
+ )
+ users_that_have_changed.update(user_signatures_changed)
- return DeviceListUpdates(
- changed=users_that_have_changed, left=newly_left_users
- )
- else:
- return DeviceListUpdates()
+ # Now find users that we no longer track
+ for room_id in newly_left_rooms:
+ left_users = await self.store.get_users_in_room(room_id)
+ newly_left_users.update(left_users)
+ # Remove any users that we still share a room with.
+ left_users_rooms = await self.store.get_rooms_for_users(newly_left_users)
+ for user_id, entries in left_users_rooms.items():
+ if any(rid in joined_rooms for rid in entries):
+ newly_left_users.discard(user_id)
+
+ return DeviceListUpdates(changed=users_that_have_changed, left=newly_left_users)
+
+ @trace
async def _generate_sync_entry_for_to_device(
self, sync_result_builder: "SyncResultBuilder"
) -> None:
@@ -1603,19 +1731,29 @@ class SyncHandler:
)
for message in messages:
- # We pop here as we shouldn't be sending the message ID down
- # `/sync`
- message_id = message.pop("message_id", None)
- if message_id:
- set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
+ log_kv(
+ {
+ "event": "to_device_message",
+ "sender": message["sender"],
+ "type": message["type"],
+ EventContentFields.TO_DEVICE_MSGID: message["content"].get(
+ EventContentFields.TO_DEVICE_MSGID
+ ),
+ }
+ )
- logger.debug(
- "Returning %d to-device messages between %d and %d (current token: %d)",
- len(messages),
- since_stream_id,
- stream_id,
- now_token.to_device_key,
- )
+ if messages and issue9533_logger.isEnabledFor(logging.DEBUG):
+ issue9533_logger.debug(
+ "Returning to-device messages with stream_ids (%d, %d]; now: %d;"
+ " msgids: %s",
+ since_stream_id,
+ stream_id,
+ now_token.to_device_key,
+ [
+ message["content"].get(EventContentFields.TO_DEVICE_MSGID)
+ for message in messages
+ ],
+ )
sync_result_builder.now_token = now_token.copy_and_replace(
StreamKeyType.TO_DEVICE, stream_id
)
@@ -1625,34 +1763,29 @@ class SyncHandler:
async def _generate_sync_entry_for_account_data(
self, sync_result_builder: "SyncResultBuilder"
- ) -> Dict[str, Dict[str, JsonDict]]:
- """Generates the account data portion of the sync response.
+ ) -> None:
+ """Generates the global account data portion of the sync response.
Account data (called "Client Config" in the spec) can be set either globally
or for a specific room. Account data consists of a list of events which
accumulate state, much like a room.
- This function retrieves global and per-room account data. The former is written
- to the given `sync_result_builder`. The latter is returned directly, to be
- later written to the `sync_result_builder` on a room-by-room basis.
+ This function retrieves global account data and writes it to the given
+ `sync_result_builder`. See `_generate_sync_entry_for_rooms` for handling
+ of per-room account data.
Args:
sync_result_builder
-
- Returns:
- A dictionary whose keys (room ids) map to the per room account data for that
- room.
"""
sync_config = sync_result_builder.sync_config
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
if since_token and not sync_result_builder.full_state:
- (
- global_account_data,
- account_data_by_room,
- ) = await self.store.get_updated_account_data_for_user(
- user_id, since_token.account_data_key
+ global_account_data = (
+ await self.store.get_updated_global_account_data_for_user(
+ user_id, since_token.account_data_key
+ )
)
push_rules_changed = await self.store.have_push_rules_changed_for_user(
@@ -1660,30 +1793,31 @@ class SyncHandler:
)
if push_rules_changed:
- global_account_data["m.push_rules"] = await self.push_rules_for_user(
- sync_config.user
- )
+ global_account_data = dict(global_account_data)
+ global_account_data[
+ AccountDataTypes.PUSH_RULES
+ ] = await self._push_rules_handler.push_rules_for_user(sync_config.user)
else:
- (
- global_account_data,
- account_data_by_room,
- ) = await self.store.get_account_data_for_user(sync_config.user.to_string())
-
- global_account_data["m.push_rules"] = await self.push_rules_for_user(
- sync_config.user
+ all_global_account_data = await self.store.get_global_account_data_for_user(
+ user_id
)
- account_data_for_user = await sync_config.filter_collection.filter_account_data(
- [
- {"type": account_data_type, "content": content}
- for account_data_type, content in global_account_data.items()
- ]
+ global_account_data = dict(all_global_account_data)
+ global_account_data[
+ AccountDataTypes.PUSH_RULES
+ ] = await self._push_rules_handler.push_rules_for_user(sync_config.user)
+
+ account_data_for_user = (
+ await sync_config.filter_collection.filter_global_account_data(
+ [
+ {"type": account_data_type, "content": content}
+ for account_data_type, content in global_account_data.items()
+ ]
+ )
)
sync_result_builder.account_data = account_data_for_user
- return account_data_by_room
-
async def _generate_sync_entry_for_presence(
self,
sync_result_builder: "SyncResultBuilder",
@@ -1743,10 +1877,8 @@ class SyncHandler:
sync_result_builder.presence = presence
async def _generate_sync_entry_for_rooms(
- self,
- sync_result_builder: "SyncResultBuilder",
- account_data_by_room: Dict[str, Dict[str, JsonDict]],
- ) -> Tuple[AbstractSet[str], AbstractSet[str], AbstractSet[str], AbstractSet[str]]:
+ self, sync_result_builder: "SyncResultBuilder"
+ ) -> Tuple[AbstractSet[str], AbstractSet[str]]:
"""Generates the rooms portion of the sync response. Populates the
`sync_result_builder` with the result.
@@ -1756,28 +1888,44 @@ class SyncHandler:
Args:
sync_result_builder
- account_data_by_room: Dictionary of per room account data
Returns:
- Returns a 4-tuple describing rooms the user has joined or left, and users who've
- joined or left rooms any rooms the user is in. This gets used later in
- `_generate_sync_entry_for_device_list`.
+ Returns a 2-tuple describing rooms the user has joined or left.
Its entries are:
- newly_joined_rooms
- - newly_joined_or_invited_or_knocked_users
- newly_left_rooms
- - newly_left_users
"""
+
since_token = sync_result_builder.since_token
+ user_id = sync_result_builder.sync_config.user.to_string()
+
+ blocks_all_rooms = (
+ sync_result_builder.sync_config.filter_collection.blocks_all_rooms()
+ )
+
+ # 0. Start by fetching room account data (if required).
+ if (
+ blocks_all_rooms
+ or sync_result_builder.sync_config.filter_collection.blocks_all_room_account_data()
+ ):
+ account_data_by_room: Mapping[str, Mapping[str, JsonDict]] = {}
+ elif since_token and not sync_result_builder.full_state:
+ account_data_by_room = (
+ await self.store.get_updated_room_account_data_for_user(
+ user_id, since_token.account_data_key
+ )
+ )
+ else:
+ account_data_by_room = await self.store.get_room_account_data_for_user(
+ user_id
+ )
# 1. Start by fetching all ephemeral events in rooms we've joined (if required).
- user_id = sync_result_builder.sync_config.user.to_string()
block_all_room_ephemeral = (
- since_token is None
- and sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral()
+ blocks_all_rooms
+ or sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral()
)
-
if block_all_room_ephemeral:
ephemeral_by_room: Dict[str, List[JsonDict]] = {}
else:
@@ -1800,19 +1948,21 @@ class SyncHandler:
)
if not tags_by_room:
logger.debug("no-oping sync")
- return set(), set(), set(), set()
+ return set(), set()
# 3. Work out which rooms need reporting in the sync response.
ignored_users = await self.store.ignored_users(user_id)
if since_token:
- room_changes = await self._get_rooms_changed(
+ room_changes = await self._get_room_changes_for_incremental_sync(
sync_result_builder, ignored_users
)
tags_by_room = await self.store.get_updated_tags(
user_id, since_token.account_data_key
)
else:
- room_changes = await self._get_all_rooms(sync_result_builder, ignored_users)
+ room_changes = await self._get_room_changes_for_initial_sync(
+ sync_result_builder, ignored_users
+ )
tags_by_room = await self.store.get_tags_for_user(user_id)
log_kv({"rooms_changed": len(room_changes.room_entries)})
@@ -1827,6 +1977,7 @@ class SyncHandler:
# joined or archived).
async def handle_room_entries(room_entry: "RoomSyncResultBuilder") -> None:
logger.debug("Generating room entry for %s", room_entry.room_id)
+ # Note that this mutates sync_result_builder.{joined,archived}.
await self._generate_room_entry(
sync_result_builder,
room_entry,
@@ -1843,20 +1994,7 @@ class SyncHandler:
sync_result_builder.invited.extend(invited)
sync_result_builder.knocked.extend(knocked)
- # 5. Work out which users have joined or left rooms we're in. We use this
- # to build the device_list part of the sync response in
- # `_generate_sync_entry_for_device_list`.
- (
- newly_joined_or_invited_or_knocked_users,
- newly_left_users,
- ) = sync_result_builder.calculate_user_changes()
-
- return (
- set(newly_joined_rooms),
- newly_joined_or_invited_or_knocked_users,
- set(newly_left_rooms),
- newly_left_users,
- )
+ return set(newly_joined_rooms), set(newly_left_rooms)
async def _have_rooms_changed(
self, sync_result_builder: "SyncResultBuilder"
@@ -1871,7 +2009,7 @@ class SyncHandler:
assert since_token
- if membership_change_events:
+ if membership_change_events or sync_result_builder.forced_newly_joined_room_ids:
return True
stream_id = since_token.room_key.stream
@@ -1880,7 +2018,7 @@ class SyncHandler:
return True
return False
- async def _get_rooms_changed(
+ async def _get_room_changes_for_incremental_sync(
self,
sync_result_builder: "SyncResultBuilder",
ignored_users: FrozenSet[str],
@@ -1918,7 +2056,9 @@ class SyncHandler:
for event in membership_change_events:
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
- newly_joined_rooms: List[str] = []
+ newly_joined_rooms: List[str] = list(
+ sync_result_builder.forced_newly_joined_room_ids
+ )
newly_left_rooms: List[str] = []
room_entries: List[RoomSyncResultBuilder] = []
invited: List[InvitedSyncResult] = []
@@ -2124,7 +2264,7 @@ class SyncHandler:
newly_left_rooms,
)
- async def _get_all_rooms(
+ async def _get_room_changes_for_initial_sync(
self,
sync_result_builder: "SyncResultBuilder",
ignored_users: FrozenSet[str],
@@ -2149,7 +2289,7 @@ class SyncHandler:
room_list = await self.store.get_rooms_for_local_user_where_membership_is(
user_id=user_id,
membership_list=Membership.LIST,
- excluded_rooms=self.rooms_to_exclude,
+ excluded_rooms=sync_result_builder.excluded_room_ids,
)
room_entries = []
@@ -2209,8 +2349,8 @@ class SyncHandler:
sync_result_builder: "SyncResultBuilder",
room_builder: "RoomSyncResultBuilder",
ephemeral: List[JsonDict],
- tags: Optional[Dict[str, Dict[str, Any]]],
- account_data: Dict[str, JsonDict],
+ tags: Optional[Mapping[str, Mapping[str, Any]]],
+ account_data: Mapping[str, JsonDict],
always_include: bool = False,
) -> None:
"""Populates the `joined` and `archived` section of `sync_result_builder`
@@ -2307,7 +2447,9 @@ class SyncHandler:
account_data_events = []
if tags is not None:
- account_data_events.append({"type": "m.tag", "content": {"tags": tags}})
+ account_data_events.append(
+ {"type": AccountDataTypes.TAG, "content": {"tags": tags}}
+ )
for account_data_type, content in account_data.items():
account_data_events.append(
@@ -2518,6 +2660,13 @@ class SyncResultBuilder:
since_token: The token supplied by user, or None.
now_token: The token to sync up to.
joined_room_ids: List of rooms the user is joined to
+ excluded_room_ids: Set of room ids we should omit from the /sync response.
+ forced_newly_joined_room_ids:
+ Rooms that should be presented in the /sync response as if they were
+ newly joined during the sync period, even if that's not the case.
+ (This is useful if the room was previously excluded from a /sync response,
+ and now the client should be made aware of it.)
+ Only used by incremental syncs.
# The following mirror the fields in a sync response
presence
@@ -2534,6 +2683,8 @@ class SyncResultBuilder:
since_token: Optional[StreamToken]
now_token: StreamToken
joined_room_ids: FrozenSet[str]
+ excluded_room_ids: FrozenSet[str]
+ forced_newly_joined_room_ids: FrozenSet[str]
membership_change_events: List[EventBase]
presence: List[UserPresenceState] = attr.Factory(list)
|