diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 07545e9496..c0e5192f00 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -48,6 +48,7 @@ from synapse.storage.roommember import MemberSummary
from synapse.types import (
DeviceListUpdates,
JsonDict,
+ JsonMapping,
PersistedEventPosition,
Requester,
RoomStreamToken,
@@ -360,6 +361,7 @@ class SlidingSyncHandler:
self.event_sources = hs.get_event_sources()
self.relations_handler = hs.get_relations_handler()
self.device_handler = hs.get_device_handler()
+ self.push_rules_handler = hs.get_push_rules_handler()
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
self.connection_store = SlidingSyncConnectionStore()
@@ -667,6 +669,7 @@ class SlidingSyncHandler:
extensions = await self.get_extensions_response(
sync_config=sync_config,
+ lists=lists,
from_token=from_token,
to_token=to_token,
)
@@ -1776,6 +1779,9 @@ class SlidingSyncHandler:
else:
assert from_bound is not None
+ # TODO: Limit the number of state events we're about to send down
+ # the room, if its too many we should change this to an
+ # `initial=True`?
deltas = await self.store.get_current_state_deltas_for_room(
room_id=room_id,
from_token=from_bound,
@@ -1830,8 +1836,14 @@ class SlidingSyncHandler:
bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
# But if we found a bump event, use that instead
if last_bump_event_result is not None:
- _, bump_event_pos = last_bump_event_result
- bump_stamp = bump_event_pos.stream
+ _, new_bump_event_pos = last_bump_event_result
+
+ # If we've just joined a remote room, then the last bump event may
+ # have been backfilled (and so have a negative stream ordering).
+ # These negative stream orderings can't sensibly be compared, so
+ # instead we use the membership event position.
+ if new_bump_event_pos.stream > 0:
+ bump_stamp = new_bump_event_pos.stream
return SlidingSyncResult.RoomResult(
name=room_name,
@@ -1863,6 +1875,7 @@ class SlidingSyncHandler:
async def get_extensions_response(
self,
sync_config: SlidingSyncConfig,
+ lists: Dict[str, SlidingSyncResult.SlidingWindowList],
to_token: StreamToken,
from_token: Optional[SlidingSyncStreamToken],
) -> SlidingSyncResult.Extensions:
@@ -1870,6 +1883,7 @@ class SlidingSyncHandler:
Args:
sync_config: Sync configuration
+ lists: Sliding window API. A map of list key to list results.
to_token: The point in the stream to sync up to.
from_token: The point in the stream to sync from.
"""
@@ -1894,9 +1908,20 @@ class SlidingSyncHandler:
from_token=from_token,
)
+ account_data_response = None
+ if sync_config.extensions.account_data is not None:
+ account_data_response = await self.get_account_data_extension_response(
+ sync_config=sync_config,
+ lists=lists,
+ account_data_request=sync_config.extensions.account_data,
+ to_token=to_token,
+ from_token=from_token,
+ )
+
return SlidingSyncResult.Extensions(
to_device=to_device_response,
e2ee=e2ee_response,
+ account_data=account_data_response,
)
async def get_to_device_extension_response(
@@ -2023,6 +2048,128 @@ class SlidingSyncHandler:
device_unused_fallback_key_types=device_unused_fallback_key_types,
)
+ async def get_account_data_extension_response(
+ self,
+ sync_config: SlidingSyncConfig,
+ lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+ account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension,
+ to_token: StreamToken,
+ from_token: Optional[SlidingSyncStreamToken],
+ ) -> Optional[SlidingSyncResult.Extensions.AccountDataExtension]:
+ """Handle Account Data extension (MSC3959)
+
+ Args:
+ sync_config: Sync configuration
+ lists: Sliding window API. A map of list key to list results.
+ account_data_request: The account_data extension from the request
+ to_token: The point in the stream to sync up to.
+ from_token: The point in the stream to sync from.
+ """
+ user_id = sync_config.user.to_string()
+
+ # Skip if the extension is not enabled
+ if not account_data_request.enabled:
+ return None
+
+ global_account_data_map: Mapping[str, JsonMapping] = {}
+ if from_token is not None:
+ global_account_data_map = (
+ await self.store.get_updated_global_account_data_for_user(
+ user_id, from_token.stream_token.account_data_key
+ )
+ )
+
+ have_push_rules_changed = await self.store.have_push_rules_changed_for_user(
+ user_id, from_token.stream_token.push_rules_key
+ )
+ if have_push_rules_changed:
+ global_account_data_map = dict(global_account_data_map)
+ global_account_data_map[AccountDataTypes.PUSH_RULES] = (
+ await self.push_rules_handler.push_rules_for_user(sync_config.user)
+ )
+ else:
+ all_global_account_data = await self.store.get_global_account_data_for_user(
+ user_id
+ )
+
+ global_account_data_map = dict(all_global_account_data)
+ global_account_data_map[AccountDataTypes.PUSH_RULES] = (
+ await self.push_rules_handler.push_rules_for_user(sync_config.user)
+ )
+
+ # We only want to include account data for rooms that are already in the sliding
+ # sync response AND that were requested in the account data request.
+ relevant_room_ids: Set[str] = set()
+
+ # See what rooms from the room subscriptions we should get account data for
+ if (
+ account_data_request.rooms is not None
+ and sync_config.room_subscriptions is not None
+ ):
+ actual_room_ids = sync_config.room_subscriptions.keys()
+
+ for room_id in account_data_request.rooms:
+ # A wildcard means we process all rooms from the room subscriptions
+ if room_id == "*":
+ relevant_room_ids.update(sync_config.room_subscriptions.keys())
+ break
+
+ if room_id in actual_room_ids:
+ relevant_room_ids.add(room_id)
+
+ # See what rooms from the sliding window lists we should get account data for
+ if account_data_request.lists is not None:
+ for list_key in account_data_request.lists:
+ # Just some typing because we share the variable name in multiple places
+ actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None
+
+ # A wildcard means we process rooms from all lists
+ if list_key == "*":
+ for actual_list in lists.values():
+ # We only expect a single SYNC operation for any list
+ assert len(actual_list.ops) == 1
+ sync_op = actual_list.ops[0]
+ assert sync_op.op == OperationType.SYNC
+
+ relevant_room_ids.update(sync_op.room_ids)
+
+ break
+
+ actual_list = lists.get(list_key)
+ if actual_list is not None:
+ # We only expect a single SYNC operation for any list
+ assert len(actual_list.ops) == 1
+ sync_op = actual_list.ops[0]
+ assert sync_op.op == OperationType.SYNC
+
+ relevant_room_ids.update(sync_op.room_ids)
+
+ # Fetch room account data
+ account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] = {}
+ if len(relevant_room_ids) > 0:
+ if from_token is not None:
+ account_data_by_room_map = (
+ await self.store.get_updated_room_account_data_for_user(
+ user_id, from_token.stream_token.account_data_key
+ )
+ )
+ else:
+ account_data_by_room_map = (
+ await self.store.get_room_account_data_for_user(user_id)
+ )
+
+ # Filter down to the relevant rooms
+ account_data_by_room_map = {
+ room_id: account_data_map
+ for room_id, account_data_map in account_data_by_room_map.items()
+ if room_id in relevant_room_ids
+ }
+
+ return SlidingSyncResult.Extensions.AccountDataExtension(
+ global_account_data_map=global_account_data_map,
+ account_data_by_room_map=account_data_by_room_map,
+ )
+
class HaveSentRoomFlag(Enum):
"""Flag for whether we have sent the room down a sliding sync connection.
@@ -2236,8 +2383,8 @@ class SlidingSyncConnectionStore:
user_id = sync_config.user.to_string()
- # If this is missing, only one sliding sync connection is allowed per
- # given conn_id.
+ # Only one sliding sync connection is allowed per given conn_id (empty
+ # or not).
conn_id = sync_config.conn_id or ""
if sync_config.requester.device_id:
|