diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 9b3a4f638b..6fb8332f93 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -21,7 +21,7 @@ from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tup
import attr
from prometheus_client import Counter
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import AccountDataTypes, EventTypes, Membership
from synapse.api.filtering import FilterCollection
from synapse.events import EventBase
from synapse.logging.context import current_context
@@ -87,7 +87,7 @@ class SyncConfig:
class TimelineBatch:
prev_batch = attr.ib(type=StreamToken)
events = attr.ib(type=List[EventBase])
- limited = attr.ib(bool)
+ limited = attr.ib(type=bool)
def __bool__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
@@ -201,6 +201,8 @@ class SyncResult:
device_lists: List of user_ids whose devices have changed
device_one_time_keys_count: Dict of algorithm to count for one time keys
for this device
+ device_unused_fallback_key_types: List of key types that have an unused fallback
+ key
groups: Group updates, if any
"""
@@ -213,6 +215,7 @@ class SyncResult:
to_device = attr.ib(type=List[JsonDict])
device_lists = attr.ib(type=DeviceLists)
device_one_time_keys_count = attr.ib(type=JsonDict)
+ device_unused_fallback_key_types = attr.ib(type=List[str])
groups = attr.ib(type=Optional[GroupsSyncResult])
def __bool__(self) -> bool:
@@ -457,8 +460,13 @@ class SyncHandler:
recents = []
if not limited or block_all_timeline:
+ prev_batch_token = now_token
+ if recents:
+ room_key = recents[0].internal_metadata.before
+ prev_batch_token = now_token.copy_and_replace("room_key", room_key)
+
return TimelineBatch(
- events=recents, prev_batch=now_token, limited=False
+ events=recents, prev_batch=prev_batch_token, limited=False
)
filtering_factor = 2
@@ -519,7 +527,7 @@ class SyncHandler:
if len(recents) > timeline_limit:
limited = True
recents = recents[-timeline_limit:]
- room_key = RoomStreamToken.parse(recents[0].internal_metadata.before)
+ room_key = recents[0].internal_metadata.before
prev_batch_token = now_token.copy_and_replace("room_key", room_key)
@@ -967,7 +975,7 @@ class SyncHandler:
raise NotImplementedError()
else:
joined_room_ids = await self.get_rooms_for_user_at(
- user_id, now_token.room_stream_id
+ user_id, now_token.room_key
)
sync_result_builder = SyncResultBuilder(
sync_config,
@@ -1014,10 +1022,14 @@ class SyncHandler:
logger.debug("Fetching OTK data")
device_id = sync_config.device_id
one_time_key_counts = {} # type: JsonDict
+ unused_fallback_key_types = [] # type: List[str]
if device_id:
one_time_key_counts = await self.store.count_e2e_one_time_keys(
user_id, device_id
)
+ unused_fallback_key_types = await self.store.get_e2e_unused_fallback_key_types(
+ user_id, device_id
+ )
logger.debug("Fetching group data")
await self._generate_sync_entry_for_groups(sync_result_builder)
@@ -1041,6 +1053,7 @@ class SyncHandler:
device_lists=device_lists,
groups=sync_result_builder.groups,
device_one_time_keys_count=one_time_key_counts,
+ device_unused_fallback_key_types=unused_fallback_key_types,
next_batch=sync_result_builder.now_token,
)
@@ -1378,13 +1391,16 @@ class SyncHandler:
return set(), set(), set(), set()
ignored_account_data = await self.store.get_global_account_data_by_type_for_user(
- "m.ignored_user_list", user_id=user_id
+ AccountDataTypes.IGNORED_USER_LIST, user_id=user_id
)
+ # If there is ignored users account data and it matches the proper type,
+ # then use it.
+ ignored_users = frozenset() # type: FrozenSet[str]
if ignored_account_data:
- ignored_users = ignored_account_data.get("ignored_users", {}).keys()
- else:
- ignored_users = frozenset()
+ ignored_users_data = ignored_account_data.get("ignored_users", {})
+ if isinstance(ignored_users_data, dict):
+ ignored_users = frozenset(ignored_users_data.keys())
if since_token:
room_changes = await self._get_rooms_changed(
@@ -1478,7 +1494,7 @@ class SyncHandler:
return False
async def _get_rooms_changed(
- self, sync_result_builder: "SyncResultBuilder", ignored_users: Set[str]
+ self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str]
) -> _RoomChanges:
"""Gets the the changes that have happened since the last sync.
"""
@@ -1595,16 +1611,24 @@ class SyncHandler:
if leave_events:
leave_event = leave_events[-1]
- leave_stream_token = await self.store.get_stream_token_for_event(
+ leave_position = await self.store.get_position_for_event(
leave_event.event_id
)
- leave_token = since_token.copy_and_replace(
- "room_key", leave_stream_token
- )
- if since_token and since_token.is_after(leave_token):
+ # If the leave event happened before the since token then we
+ # bail.
+ if since_token and not leave_position.persisted_after(
+ since_token.room_key
+ ):
continue
+ # We can safely convert the position of the leave event into a
+ # stream token as it'll only be used in the context of this
+ # room. (c.f. the docstring of `to_room_stream_token`).
+ leave_token = since_token.copy_and_replace(
+ "room_key", leave_position.to_room_stream_token()
+ )
+
# If this is an out of band message, like a remote invite
# rejection, we include it in the recents batch. Otherwise, we
# let _load_filtered_recents handle fetching the correct
@@ -1682,7 +1706,7 @@ class SyncHandler:
return _RoomChanges(room_entries, invited, newly_joined_rooms, newly_left_rooms)
async def _get_all_rooms(
- self, sync_result_builder: "SyncResultBuilder", ignored_users: Set[str]
+ self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str]
) -> _RoomChanges:
"""Returns entries for all rooms for the user.
@@ -1756,7 +1780,7 @@ class SyncHandler:
async def _generate_room_entry(
self,
sync_result_builder: "SyncResultBuilder",
- ignored_users: Set[str],
+ ignored_users: FrozenSet[str],
room_builder: "RoomSyncResultBuilder",
ephemeral: List[JsonDict],
tags: Optional[Dict[str, Dict[str, Any]]],
@@ -1916,7 +1940,7 @@ class SyncHandler:
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
async def get_rooms_for_user_at(
- self, user_id: str, stream_ordering: int
+ self, user_id: str, room_key: RoomStreamToken
) -> FrozenSet[str]:
"""Get set of joined rooms for a user at the given stream ordering.
@@ -1942,15 +1966,15 @@ class SyncHandler:
# If the membership's stream ordering is after the given stream
# ordering, we need to go and work out if the user was in the room
# before.
- for room_id, membership_stream_ordering in joined_rooms:
- if membership_stream_ordering <= stream_ordering:
+ for room_id, event_pos in joined_rooms:
+ if not event_pos.persisted_after(room_key):
joined_room_ids.add(room_id)
continue
logger.info("User joined room after current token: %s", room_id)
extrems = await self.store.get_forward_extremeties_for_room(
- room_id, stream_ordering
+ room_id, event_pos.stream
)
users_in_room = await self.state.get_current_users_in_room(room_id, extrems)
if user_id in users_in_room:
|