diff --git a/changelog.d/17447.feature b/changelog.d/17447.feature
new file mode 100644
index 0000000000..6f80e298ae
--- /dev/null
+++ b/changelog.d/17447.feature
@@ -0,0 +1 @@
+Track which rooms have been sent to clients in the experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
diff --git a/changelog.d/17476.doc b/changelog.d/17476.doc
new file mode 100644
index 0000000000..89d8d490bb
--- /dev/null
+++ b/changelog.d/17476.doc
@@ -0,0 +1 @@
+Update the [`allowed_local_3pids`](https://element-hq.github.io/synapse/v1.112/usage/configuration/config_documentation.html#allowed_local_3pids) config option's msisdn address to a working example.
diff --git a/changelog.d/17477.feature b/changelog.d/17477.feature
new file mode 100644
index 0000000000..9785a2ef7b
--- /dev/null
+++ b/changelog.d/17477.feature
@@ -0,0 +1 @@
+Add Account Data extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
diff --git a/changelog.d/17479.misc b/changelog.d/17479.misc
new file mode 100644
index 0000000000..4502f71662
--- /dev/null
+++ b/changelog.d/17479.misc
@@ -0,0 +1 @@
+Do not send down empty room entries down experimental sliding sync endpoint.
diff --git a/changelog.d/17481.misc b/changelog.d/17481.misc
new file mode 100644
index 0000000000..ac55538424
--- /dev/null
+++ b/changelog.d/17481.misc
@@ -0,0 +1 @@
+Refactor Sliding Sync tests to better utilize the `SlidingSyncBase`.
diff --git a/changelog.d/17482.misc b/changelog.d/17482.misc
new file mode 100644
index 0000000000..ac55538424
--- /dev/null
+++ b/changelog.d/17482.misc
@@ -0,0 +1 @@
+Refactor Sliding Sync tests to better utilize the `SlidingSyncBase`.
diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md
index 649f4f71c7..40f64be856 100644
--- a/docs/usage/configuration/config_documentation.md
+++ b/docs/usage/configuration/config_documentation.md
@@ -2386,7 +2386,7 @@ enable_registration_without_verification: true
---
### `registrations_require_3pid`
-If this is set, users must provide all of the specified types of 3PID when registering an account.
+If this is set, users must provide all of the specified types of [3PID](https://spec.matrix.org/latest/appendices/#3pid-types) when registering an account.
Note that [`enable_registration`](#enable_registration) must also be set to allow account registration.
@@ -2411,6 +2411,9 @@ disable_msisdn_registration: true
Mandate that users are only allowed to associate certain formats of
3PIDs with accounts on this server, as specified by the `medium` and `pattern` sub-options.
+`pattern` is a [Perl-like regular expression](https://docs.python.org/3/library/re.html#module-re).
+
+More information about 3PIDs, allowed `medium` types and their `address` syntax can be found [in the Matrix spec](https://spec.matrix.org/latest/appendices/#3pid-types).
Example configuration:
```yaml
@@ -2420,7 +2423,7 @@ allowed_local_3pids:
- medium: email
pattern: '^[^@]+@vector\.im$'
- medium: msisdn
- pattern: '\+44'
+ pattern: '^44\d{10}$'
```
---
### `enable_3pid_lookup`
diff --git a/pyproject.toml b/pyproject.toml
index 0b5dc418e4..1adf8e087f 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -201,8 +201,8 @@ netaddr = ">=0.7.18"
# add a lower bound to the Jinja2 dependency.
Jinja2 = ">=3.0"
bleach = ">=1.4.3"
-# We use `Self`, which were added in `typing-extensions` 4.0.
-typing-extensions = ">=4.0"
+# We use `assert_never`, which were added in `typing-extensions` 4.1.
+typing-extensions = ">=4.1"
# We enforce that we have a `cryptography` version that bundles an `openssl`
# with the latest security patches.
cryptography = ">=3.4.7"
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index f1f6f30b95..84f2fa18ff 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -18,6 +18,7 @@
#
#
import logging
+from enum import Enum
from itertools import chain
from typing import (
TYPE_CHECKING,
@@ -34,6 +35,7 @@ from typing import (
import attr
from immutabledict import immutabledict
+from typing_extensions import assert_never
from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership
from synapse.events import EventBase
@@ -46,11 +48,13 @@ from synapse.storage.roommember import MemberSummary
from synapse.types import (
DeviceListUpdates,
JsonDict,
+ JsonMapping,
PersistedEventPosition,
Requester,
RoomStreamToken,
SlidingSyncStreamToken,
StateMap,
+ StrCollection,
StreamKeyType,
StreamToken,
UserID,
@@ -357,8 +361,11 @@ class SlidingSyncHandler:
self.event_sources = hs.get_event_sources()
self.relations_handler = hs.get_relations_handler()
self.device_handler = hs.get_device_handler()
+ self.push_rules_handler = hs.get_push_rules_handler()
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
+ self.connection_store = SlidingSyncConnectionStore()
+
async def wait_for_sync_for_user(
self,
requester: Requester,
@@ -462,6 +469,11 @@ class SlidingSyncHandler:
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()
+ await self.connection_store.mark_token_seen(
+ sync_config=sync_config,
+ from_token=from_token,
+ )
+
# Get all of the room IDs that the user should be able to see in the sync
# response
has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
@@ -607,11 +619,56 @@ class SlidingSyncHandler:
# Fetch room data
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
+ # Filter out rooms that haven't received updates and we've sent down
+ # previously.
+ if from_token:
+ rooms_should_send = set()
+
+ # First we check if there are rooms that match a list/room
+ # subscription and have updates we need to send (i.e. either because
+ # we haven't sent the room down, or we have but there are missing
+ # updates).
+ for room_id in relevant_room_map:
+ status = await self.connection_store.have_sent_room(
+ sync_config,
+ from_token.connection_position,
+ room_id,
+ )
+ if (
+ # The room was never sent down before so the client needs to know
+ # about it regardless of any updates.
+ status.status == HaveSentRoomFlag.NEVER
+ # `PREVIOUSLY` literally means the "room was sent down before *AND*
+ # there are updates we haven't sent down" so we already know this
+ # room has updates.
+ or status.status == HaveSentRoomFlag.PREVIOUSLY
+ ):
+ rooms_should_send.add(room_id)
+ elif status.status == HaveSentRoomFlag.LIVE:
+ # We know that we've sent all updates up until `from_token`,
+ # so we just need to check if there have been updates since
+ # then.
+ pass
+ else:
+ assert_never(status.status)
+
+ # We only need to check for new events since any state changes
+ # will also come down as new events.
+ rooms_that_have_updates = self.store.get_rooms_that_might_have_updates(
+ relevant_room_map.keys(), from_token.stream_token.room_key
+ )
+ rooms_should_send.update(rooms_that_have_updates)
+ relevant_room_map = {
+ room_id: room_sync_config
+ for room_id, room_sync_config in relevant_room_map.items()
+ if room_id in rooms_should_send
+ }
+
@trace
@tag_args
async def handle_room(room_id: str) -> None:
room_sync_result = await self.get_room_sync_data(
- user=sync_config.user,
+ sync_config=sync_config,
room_id=room_id,
room_sync_config=relevant_room_map[room_id],
room_membership_for_user_at_to_token=room_membership_for_user_map[
@@ -621,22 +678,36 @@ class SlidingSyncHandler:
to_token=to_token,
)
- rooms[room_id] = room_sync_result
+ # Filter out empty room results during incremental sync
+ if room_sync_result or not from_token:
+ rooms[room_id] = room_sync_result
with start_active_span("sliding_sync.generate_room_entries"):
await concurrently_execute(handle_room, relevant_room_map, 10)
extensions = await self.get_extensions_response(
sync_config=sync_config,
+ lists=lists,
from_token=from_token,
to_token=to_token,
)
- # TODO: Update this when we implement per-connection state
- connection_token = 0
+ if has_lists or has_room_subscriptions:
+ connection_position = await self.connection_store.record_rooms(
+ sync_config=sync_config,
+ from_token=from_token,
+ sent_room_ids=relevant_room_map.keys(),
+ # TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
+ unsent_room_ids=[],
+ )
+ elif from_token:
+ connection_position = from_token.connection_position
+ else:
+ # Initial sync without a `from_token` starts at `0`
+ connection_position = 0
return SlidingSyncResult(
- next_pos=SlidingSyncStreamToken(to_token, connection_token),
+ next_pos=SlidingSyncStreamToken(to_token, connection_position),
lists=lists,
rooms=rooms,
extensions=extensions,
@@ -1367,7 +1438,7 @@ class SlidingSyncHandler:
async def get_room_sync_data(
self,
- user: UserID,
+ sync_config: SlidingSyncConfig,
room_id: str,
room_sync_config: RoomSyncConfig,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
@@ -1389,6 +1460,37 @@ class SlidingSyncHandler:
from_token: The point in the stream to sync from.
to_token: The point in the stream to sync up to.
"""
+ user = sync_config.user
+
+ # Determine whether we should limit the timeline to the token range.
+ #
+ # We should return historical messages (before token range) in the
+ # following cases because we want clients to be able to show a basic
+ # screen of information:
+ # - Initial sync (because no `from_token` to limit us anyway)
+ # - When users `newly_joined`
+ # - For an incremental sync where we haven't sent it down this
+ # connection before
+ from_bound = None
+ initial = True
+ if from_token and not room_membership_for_user_at_to_token.newly_joined:
+ room_status = await self.connection_store.have_sent_room(
+ sync_config=sync_config,
+ connection_token=from_token.connection_position,
+ room_id=room_id,
+ )
+ if room_status.status == HaveSentRoomFlag.LIVE:
+ from_bound = from_token.stream_token.room_key
+ initial = False
+ elif room_status.status == HaveSentRoomFlag.PREVIOUSLY:
+ assert room_status.last_token is not None
+ from_bound = room_status.last_token
+ initial = False
+ elif room_status.status == HaveSentRoomFlag.NEVER:
+ from_bound = None
+ initial = True
+ else:
+ assert_never(room_status.status)
# Assemble the list of timeline events
#
@@ -1415,36 +1517,23 @@ class SlidingSyncHandler:
prev_batch_token = to_token
# We're going to paginate backwards from the `to_token`
- from_bound = to_token.room_key
+ to_bound = to_token.room_key
# People shouldn't see past their leave/ban event
if room_membership_for_user_at_to_token.membership in (
Membership.LEAVE,
Membership.BAN,
):
- from_bound = (
+ to_bound = (
room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
)
- # Determine whether we should limit the timeline to the token range.
- #
- # We should return historical messages (before token range) in the
- # following cases because we want clients to be able to show a basic
- # screen of information:
- # - Initial sync (because no `from_token` to limit us anyway)
- # - When users `newly_joined`
- # - TODO: For an incremental sync where we haven't sent it down this
- # connection before
- to_bound = (
- from_token.stream_token.room_key
- if from_token is not None
- and not room_membership_for_user_at_to_token.newly_joined
- else None
- )
-
timeline_events, new_room_key = await self.store.paginate_room_events(
room_id=room_id,
- from_key=from_bound,
- to_key=to_bound,
+ # The bounds are reversed so we can paginate backwards
+ # (from newer to older events) starting at to_bound.
+ # This ensures we fill the `limit` with the newest events first,
+ from_key=to_bound,
+ to_key=from_bound,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
@@ -1561,12 +1650,6 @@ class SlidingSyncHandler:
# indicate to the client that a state reset happened. Perhaps we should indicate
# this by setting `initial: True` and empty `required_state`.
- # TODO: Since we can't determine whether we've already sent a room down this
- # Sliding Sync connection before (we plan to add this optimization in the
- # future), we're always returning the requested room state instead of
- # updates.
- initial = True
-
# Check whether the room has a name set
name_state_ids = await self.get_current_state_ids_at(
room_id=room_id,
@@ -1712,9 +1795,22 @@ class SlidingSyncHandler:
to_token=to_token,
)
else:
- # TODO: Once we can figure out if we've sent a room down this connection before,
- # we can return updates instead of the full required state.
- raise NotImplementedError()
+ assert from_bound is not None
+
+ # TODO: Limit the number of state events we're about to send down
+ # the room, if its too many we should change this to an
+ # `initial=True`?
+ deltas = await self.store.get_current_state_deltas_for_room(
+ room_id=room_id,
+ from_token=from_bound,
+ to_token=to_token.room_key,
+ )
+ # TODO: Filter room state before fetching events
+ # TODO: Handle state resets where event_id is None
+ events = await self.store.get_events(
+ [d.event_id for d in deltas if d.event_id]
+ )
+ room_state = {(s.type, s.state_key): s for s in events.values()}
required_room_state: StateMap[EventBase] = {}
if required_state_filter != StateFilter.none():
@@ -1797,6 +1893,7 @@ class SlidingSyncHandler:
async def get_extensions_response(
self,
sync_config: SlidingSyncConfig,
+ lists: Dict[str, SlidingSyncResult.SlidingWindowList],
to_token: StreamToken,
from_token: Optional[SlidingSyncStreamToken],
) -> SlidingSyncResult.Extensions:
@@ -1804,6 +1901,7 @@ class SlidingSyncHandler:
Args:
sync_config: Sync configuration
+ lists: Sliding window API. A map of list key to list results.
to_token: The point in the stream to sync up to.
from_token: The point in the stream to sync from.
"""
@@ -1828,9 +1926,20 @@ class SlidingSyncHandler:
from_token=from_token,
)
+ account_data_response = None
+ if sync_config.extensions.account_data is not None:
+ account_data_response = await self.get_account_data_extension_response(
+ sync_config=sync_config,
+ lists=lists,
+ account_data_request=sync_config.extensions.account_data,
+ to_token=to_token,
+ from_token=from_token,
+ )
+
return SlidingSyncResult.Extensions(
to_device=to_device_response,
e2ee=e2ee_response,
+ account_data=account_data_response,
)
async def get_to_device_extension_response(
@@ -1847,7 +1956,7 @@ class SlidingSyncHandler:
to_token: The point in the stream to sync up to.
"""
user_id = sync_config.user.to_string()
- device_id = sync_config.device_id
+ device_id = sync_config.requester.device_id
# Skip if the extension is not enabled
if not to_device_request.enabled:
@@ -1923,7 +2032,7 @@ class SlidingSyncHandler:
from_token: The point in the stream to sync from.
"""
user_id = sync_config.user.to_string()
- device_id = sync_config.device_id
+ device_id = sync_config.requester.device_id
# Skip if the extension is not enabled
if not e2ee_request.enabled:
@@ -1956,3 +2065,357 @@ class SlidingSyncHandler:
device_one_time_keys_count=device_one_time_keys_count,
device_unused_fallback_key_types=device_unused_fallback_key_types,
)
+
+ async def get_account_data_extension_response(
+ self,
+ sync_config: SlidingSyncConfig,
+ lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+ account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension,
+ to_token: StreamToken,
+ from_token: Optional[SlidingSyncStreamToken],
+ ) -> Optional[SlidingSyncResult.Extensions.AccountDataExtension]:
+ """Handle Account Data extension (MSC3959)
+
+ Args:
+ sync_config: Sync configuration
+ lists: Sliding window API. A map of list key to list results.
+ account_data_request: The account_data extension from the request
+ to_token: The point in the stream to sync up to.
+ from_token: The point in the stream to sync from.
+ """
+ user_id = sync_config.user.to_string()
+
+ # Skip if the extension is not enabled
+ if not account_data_request.enabled:
+ return None
+
+ global_account_data_map: Mapping[str, JsonMapping] = {}
+ if from_token is not None:
+ global_account_data_map = (
+ await self.store.get_updated_global_account_data_for_user(
+ user_id, from_token.stream_token.account_data_key
+ )
+ )
+
+ have_push_rules_changed = await self.store.have_push_rules_changed_for_user(
+ user_id, from_token.stream_token.push_rules_key
+ )
+ if have_push_rules_changed:
+ global_account_data_map = dict(global_account_data_map)
+ global_account_data_map[AccountDataTypes.PUSH_RULES] = (
+ await self.push_rules_handler.push_rules_for_user(sync_config.user)
+ )
+ else:
+ all_global_account_data = await self.store.get_global_account_data_for_user(
+ user_id
+ )
+
+ global_account_data_map = dict(all_global_account_data)
+ global_account_data_map[AccountDataTypes.PUSH_RULES] = (
+ await self.push_rules_handler.push_rules_for_user(sync_config.user)
+ )
+
+ # We only want to include account data for rooms that are already in the sliding
+ # sync response AND that were requested in the account data request.
+ relevant_room_ids: Set[str] = set()
+
+ # See what rooms from the room subscriptions we should get account data for
+ if (
+ account_data_request.rooms is not None
+ and sync_config.room_subscriptions is not None
+ ):
+ actual_room_ids = sync_config.room_subscriptions.keys()
+
+ for room_id in account_data_request.rooms:
+ # A wildcard means we process all rooms from the room subscriptions
+ if room_id == "*":
+ relevant_room_ids.update(sync_config.room_subscriptions.keys())
+ break
+
+ if room_id in actual_room_ids:
+ relevant_room_ids.add(room_id)
+
+ # See what rooms from the sliding window lists we should get account data for
+ if account_data_request.lists is not None:
+ for list_key in account_data_request.lists:
+ # Just some typing because we share the variable name in multiple places
+ actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None
+
+ # A wildcard means we process rooms from all lists
+ if list_key == "*":
+ for actual_list in lists.values():
+ # We only expect a single SYNC operation for any list
+ assert len(actual_list.ops) == 1
+ sync_op = actual_list.ops[0]
+ assert sync_op.op == OperationType.SYNC
+
+ relevant_room_ids.update(sync_op.room_ids)
+
+ break
+
+ actual_list = lists.get(list_key)
+ if actual_list is not None:
+ # We only expect a single SYNC operation for any list
+ assert len(actual_list.ops) == 1
+ sync_op = actual_list.ops[0]
+ assert sync_op.op == OperationType.SYNC
+
+ relevant_room_ids.update(sync_op.room_ids)
+
+ # Fetch room account data
+ account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] = {}
+ if len(relevant_room_ids) > 0:
+ if from_token is not None:
+ account_data_by_room_map = (
+ await self.store.get_updated_room_account_data_for_user(
+ user_id, from_token.stream_token.account_data_key
+ )
+ )
+ else:
+ account_data_by_room_map = (
+ await self.store.get_room_account_data_for_user(user_id)
+ )
+
+ # Filter down to the relevant rooms
+ account_data_by_room_map = {
+ room_id: account_data_map
+ for room_id, account_data_map in account_data_by_room_map.items()
+ if room_id in relevant_room_ids
+ }
+
+ return SlidingSyncResult.Extensions.AccountDataExtension(
+ global_account_data_map=global_account_data_map,
+ account_data_by_room_map=account_data_by_room_map,
+ )
+
+
+class HaveSentRoomFlag(Enum):
+ """Flag for whether we have sent the room down a sliding sync connection.
+
+ The valid state changes here are:
+ NEVER -> LIVE
+ LIVE -> PREVIOUSLY
+ PREVIOUSLY -> LIVE
+ """
+
+ # The room has never been sent down (or we have forgotten we have sent it
+ # down).
+ NEVER = 1
+
+ # We have previously sent the room down, but there are updates that we
+ # haven't sent down.
+ PREVIOUSLY = 2
+
+ # We have sent the room down and the client has received all updates.
+ LIVE = 3
+
+
+@attr.s(auto_attribs=True, slots=True, frozen=True)
+class HaveSentRoom:
+ """Whether we have sent the room down a sliding sync connection.
+
+ Attributes:
+ status: Flag of if we have or haven't sent down the room
+ last_token: If the flag is `PREVIOUSLY` then this is non-null and
+ contains the last stream token of the last updates we sent down
+ the room, i.e. we still need to send everything since then to the
+ client.
+ """
+
+ status: HaveSentRoomFlag
+ last_token: Optional[RoomStreamToken]
+
+ @staticmethod
+ def previously(last_token: RoomStreamToken) -> "HaveSentRoom":
+ """Constructor for `PREVIOUSLY` flag."""
+ return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token)
+
+
+HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None)
+HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None)
+
+
+@attr.s(auto_attribs=True)
+class SlidingSyncConnectionStore:
+ """In-memory store of per-connection state, including what rooms we have
+ previously sent down a sliding sync connection.
+
+ Note: This is NOT safe to run in a worker setup because connection positions will
+ point to different sets of rooms on different workers. e.g. for the same connection,
+ a connection position of 5 might have totally different states on worker A and
+ worker B.
+
+ One complication that we need to deal with here is needing to handle requests being
+ resent, i.e. if we sent down a room in a response that the client received, we must
+ consider the room *not* sent when we get the request again.
+
+ This is handled by using an integer "token", which is returned to the client
+ as part of the sync token. For each connection we store a mapping from
+ tokens to the room states, and create a new entry when we send down new
+ rooms.
+
+ Note that for any given sliding sync connection we will only store a maximum
+ of two different tokens: the previous token from the request and a new token
+ sent in the response. When we receive a request with a given token, we then
+ clear out all other entries with a different token.
+
+ Attributes:
+ _connections: Mapping from `(user_id, conn_id)` to mapping of `token`
+ to mapping of room ID to `HaveSentRoom`.
+ """
+
+ # `(user_id, conn_id)` -> `token` -> `room_id` -> `HaveSentRoom`
+ _connections: Dict[Tuple[str, str], Dict[int, Dict[str, HaveSentRoom]]] = (
+ attr.Factory(dict)
+ )
+
+ async def have_sent_room(
+ self, sync_config: SlidingSyncConfig, connection_token: int, room_id: str
+ ) -> HaveSentRoom:
+ """For the given user_id/conn_id/token, return whether we have
+ previously sent the room down
+ """
+
+ conn_key = self._get_connection_key(sync_config)
+ sync_statuses = self._connections.setdefault(conn_key, {})
+ room_status = sync_statuses.get(connection_token, {}).get(
+ room_id, HAVE_SENT_ROOM_NEVER
+ )
+
+ return room_status
+
+ async def record_rooms(
+ self,
+ sync_config: SlidingSyncConfig,
+ from_token: Optional[SlidingSyncStreamToken],
+ *,
+ sent_room_ids: StrCollection,
+ unsent_room_ids: StrCollection,
+ ) -> int:
+ """Record which rooms we have/haven't sent down in a new response
+
+ Attributes:
+ sync_config
+ from_token: The since token from the request, if any
+ sent_room_ids: The set of room IDs that we have sent down as
+ part of this request (only needs to be ones we didn't
+ previously sent down).
+ unsent_room_ids: The set of room IDs that have had updates
+ since the `from_token`, but which were not included in
+ this request
+ """
+ prev_connection_token = 0
+ if from_token is not None:
+ prev_connection_token = from_token.connection_position
+
+ # If there are no changes then this is a noop.
+ if not sent_room_ids and not unsent_room_ids:
+ return prev_connection_token
+
+ conn_key = self._get_connection_key(sync_config)
+ sync_statuses = self._connections.setdefault(conn_key, {})
+
+ # Generate a new token, removing any existing entries in that token
+ # (which can happen if requests get resent).
+ new_store_token = prev_connection_token + 1
+ sync_statuses.pop(new_store_token, None)
+
+ # Copy over and update the room mappings.
+ new_room_statuses = dict(sync_statuses.get(prev_connection_token, {}))
+
+ # Whether we have updated the `new_room_statuses`, if we don't by the
+ # end we can treat this as a noop.
+ have_updated = False
+ for room_id in sent_room_ids:
+ new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE
+ have_updated = True
+
+ # Whether we add/update the entries for unsent rooms depends on the
+ # existing entry:
+ # - LIVE: We have previously sent down everything up to
+ # `last_room_token, so we update the entry to be `PREVIOUSLY` with
+ # `last_room_token`.
+ # - PREVIOUSLY: We have previously sent down everything up to *a*
+ # given token, so we don't need to update the entry.
+ # - NEVER: We have never previously sent down the room, and we haven't
+ # sent anything down this time either so we leave it as NEVER.
+
+ # Work out the new state for unsent rooms that were `LIVE`.
+ if from_token:
+ new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key)
+ else:
+ new_unsent_state = HAVE_SENT_ROOM_NEVER
+
+ for room_id in unsent_room_ids:
+ prev_state = new_room_statuses.get(room_id)
+ if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE:
+ new_room_statuses[room_id] = new_unsent_state
+ have_updated = True
+
+ if not have_updated:
+ return prev_connection_token
+
+ sync_statuses[new_store_token] = new_room_statuses
+
+ return new_store_token
+
+ async def mark_token_seen(
+ self,
+ sync_config: SlidingSyncConfig,
+ from_token: Optional[SlidingSyncStreamToken],
+ ) -> None:
+ """We have received a request with the given token, so we can clear out
+ any other tokens associated with the connection.
+
+ If there is no from token then we have started afresh, and so we delete
+ all tokens associated with the device.
+ """
+ # Clear out any tokens for the connection that doesn't match the one
+ # from the request.
+
+ conn_key = self._get_connection_key(sync_config)
+ sync_statuses = self._connections.pop(conn_key, {})
+ if from_token is None:
+ return
+
+ sync_statuses = {
+ connection_token: room_statuses
+ for connection_token, room_statuses in sync_statuses.items()
+ if connection_token == from_token.connection_position
+ }
+ if sync_statuses:
+ self._connections[conn_key] = sync_statuses
+
+ @staticmethod
+ def _get_connection_key(sync_config: SlidingSyncConfig) -> Tuple[str, str]:
+ """Return a unique identifier for this connection.
+
+ The first part is simply the user ID.
+
+ The second part is generally a combination of device ID and conn_id.
+ However, both these two are optional (e.g. puppet access tokens don't
+ have device IDs), so this handles those edge cases.
+
+ We use this over the raw `conn_id` to avoid clashes between different
+ clients that use the same `conn_id`. Imagine a user uses a web client
+ that uses `conn_id: main_sync_loop` and an Android client that also has
+ a `conn_id: main_sync_loop`.
+ """
+
+ user_id = sync_config.user.to_string()
+
+ # Only one sliding sync connection is allowed per given conn_id (empty
+ # or not).
+ conn_id = sync_config.conn_id or ""
+
+ if sync_config.requester.device_id:
+ return (user_id, f"D/{sync_config.requester.device_id}/{conn_id}")
+
+ if sync_config.requester.access_token_id:
+ # If we don't have a device, then the access token ID should be a
+ # stable ID.
+ return (user_id, f"A/{sync_config.requester.access_token_id}/{conn_id}")
+
+ # If we have neither then its likely an AS or some weird token. Either
+ # way we can just fail here.
+ raise Exception("Cannot use sliding sync with access token type")
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index d72dfa2b10..bf3ac8d483 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -881,7 +881,6 @@ class SlidingSyncRestServlet(RestServlet):
)
user = requester.user
- device_id = requester.device_id
timeout = parse_integer(request, "timeout", default=0)
# Position in the stream
@@ -902,11 +901,12 @@ class SlidingSyncRestServlet(RestServlet):
sync_config = SlidingSyncConfig(
user=user,
- device_id=device_id,
+ requester=requester,
# FIXME: Currently, we're just manually copying the fields from the
- # `SlidingSyncBody` into the config. How can we gurantee into the future
+ # `SlidingSyncBody` into the config. How can we guarantee into the future
# that we don't forget any? I would like something more structured like
# `copy_attributes(from=body, to=config)`
+ conn_id=body.conn_id,
lists=body.lists,
room_subscriptions=body.room_subscriptions,
extensions=body.extensions,
@@ -929,7 +929,6 @@ class SlidingSyncRestServlet(RestServlet):
return 200, response_content
- # TODO: Is there a better way to encode things?
async def encode_response(
self,
requester: Requester,
@@ -1117,6 +1116,24 @@ class SlidingSyncRestServlet(RestServlet):
extensions.e2ee.device_list_updates.left
)
+ if extensions.account_data is not None:
+ serialized_extensions["account_data"] = {
+ # Same as the the top-level `account_data.events` field in Sync v2.
+ "global": [
+ {"type": account_data_type, "content": content}
+ for account_data_type, content in extensions.account_data.global_account_data_map.items()
+ ],
+ # Same as the joined room's account_data field in Sync v2, e.g the path
+ # `rooms.join["!foo:bar"].account_data.events`.
+ "rooms": {
+ room_id: [
+ {"type": account_data_type, "content": content}
+ for account_data_type, content in event_map.items()
+ ]
+ for room_id, event_map in extensions.account_data.account_data_by_room_map.items()
+ },
+ }
+
return serialized_extensions
diff --git a/synapse/server.py b/synapse/server.py
index 4a3f9ff934..46b9d83a04 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -559,6 +559,7 @@ class HomeServer(metaclass=abc.ABCMeta):
def get_sync_handler(self) -> SyncHandler:
return SyncHandler(self)
+ @cache_in_self
def get_sliding_sync_handler(self) -> SlidingSyncHandler:
return SlidingSyncHandler(self)
diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py
index 036972ac25..da3ebe66b8 100644
--- a/synapse/storage/databases/main/state_deltas.py
+++ b/synapse/storage/databases/main/state_deltas.py
@@ -26,6 +26,8 @@ import attr
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import LoggingTransaction
+from synapse.storage.databases.main.stream import _filter_results_by_stream
+from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
logger = logging.getLogger(__name__)
@@ -156,3 +158,38 @@ class StateDeltasStore(SQLBaseStore):
"get_max_stream_id_in_current_state_deltas",
self._get_max_stream_id_in_current_state_deltas_txn,
)
+
+ async def get_current_state_deltas_for_room(
+ self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken
+ ) -> List[StateDelta]:
+ """Get the state deltas between two tokens."""
+
+ def get_current_state_deltas_for_room_txn(
+ txn: LoggingTransaction,
+ ) -> List[StateDelta]:
+ sql = """
+ SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id
+ FROM current_state_delta_stream
+ WHERE room_id = ? AND ? < stream_id AND stream_id <= ?
+ ORDER BY stream_id ASC
+ """
+ txn.execute(
+ sql, (room_id, from_token.stream, to_token.get_max_stream_pos())
+ )
+
+ return [
+ StateDelta(
+ stream_id=row[1],
+ room_id=room_id,
+ event_type=row[2],
+ state_key=row[3],
+ event_id=row[4],
+ prev_event_id=row[5],
+ )
+ for row in txn
+ if _filter_results_by_stream(from_token, to_token, row[0], row[1])
+ ]
+
+ return await self.db_pool.runInteraction(
+ "get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
+ )
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index b034361aec..4207e73c7f 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -2104,3 +2104,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return RoomStreamToken(stream=last_position.stream - 1)
return None
+
+ def get_rooms_that_might_have_updates(
+ self, room_ids: StrCollection, from_token: RoomStreamToken
+ ) -> StrCollection:
+ """Filters given room IDs down to those that might have updates, i.e.
+ removes rooms that definitely do not have updates.
+ """
+ return self._events_stream_cache.get_entities_changed(
+ room_ids, from_token.stream
+ )
diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py
index 59eb0963ee..f26cc0e903 100644
--- a/synapse/types/handlers/__init__.py
+++ b/synapse/types/handlers/__init__.py
@@ -35,6 +35,7 @@ from synapse.types import (
DeviceListUpdates,
JsonDict,
JsonMapping,
+ Requester,
SlidingSyncStreamToken,
StreamToken,
UserID,
@@ -109,7 +110,7 @@ class SlidingSyncConfig(SlidingSyncBody):
"""
user: UserID
- device_id: Optional[str]
+ requester: Requester
# Pydantic config
class Config:
@@ -237,6 +238,17 @@ class SlidingSyncResult:
notification_count: int
highlight_count: int
+ def __bool__(self) -> bool:
+ return (
+ # If this is the first time the client is seeing the room, we should not filter it out
+ # under any circumstance.
+ self.initial
+ # We need to let the client know if there are any new events
+ or bool(self.required_state)
+ or bool(self.timeline_events)
+ or bool(self.stripped_state)
+ )
+
@attr.s(slots=True, frozen=True, auto_attribs=True)
class SlidingWindowList:
"""
@@ -330,11 +342,31 @@ class SlidingSyncResult:
or self.device_unused_fallback_key_types
)
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class AccountDataExtension:
+ """The Account Data extension (MSC3959)
+
+ Attributes:
+ global_account_data_map: Mapping from `type` to `content` of global account
+ data events.
+ account_data_by_room_map: Mapping from room_id to mapping of `type` to
+ `content` of room account data events.
+ """
+
+ global_account_data_map: Mapping[str, JsonMapping]
+ account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]]
+
+ def __bool__(self) -> bool:
+ return bool(
+ self.global_account_data_map or self.account_data_by_room_map
+ )
+
to_device: Optional[ToDeviceExtension] = None
e2ee: Optional[E2eeExtension] = None
+ account_data: Optional[AccountDataExtension] = None
def __bool__(self) -> bool:
- return bool(self.to_device or self.e2ee)
+ return bool(self.to_device or self.e2ee or self.account_data)
next_pos: SlidingSyncStreamToken
lists: Dict[str, SlidingWindowList]
@@ -346,7 +378,11 @@ class SlidingSyncResult:
to tell if the notifier needs to wait for more events when polling for
events.
"""
- return bool(self.lists or self.rooms or self.extensions)
+ # We don't include `self.lists` here, as a) `lists` is always non-empty even if
+ # there are no changes, and b) since we're sorting rooms by `stream_ordering` of
+ # the latest activity, anything that would cause the order to change would end
+ # up in `self.rooms` and cause us to send down the change.
+ return bool(self.rooms or self.extensions)
@staticmethod
def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult":
diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py
index f3c45a0d6a..dfe3b1e0f7 100644
--- a/synapse/types/rest/client/__init__.py
+++ b/synapse/types/rest/client/__init__.py
@@ -120,6 +120,9 @@ class SlidingSyncBody(RequestBodyModel):
Sliding Sync API request body.
Attributes:
+ conn_id: An optional string to identify this connection to the server.
+ Only one sliding sync connection is allowed per given conn_id (empty
+ or not).
lists: Sliding window API. A map of list key to list information
(:class:`SlidingSyncList`). Max lists: 100. The list keys should be
arbitrary strings which the client is using to refer to the list. Keep this
@@ -322,8 +325,28 @@ class SlidingSyncBody(RequestBodyModel):
enabled: Optional[StrictBool] = False
+ class AccountDataExtension(RequestBodyModel):
+ """The Account Data extension (MSC3959)
+
+ Attributes:
+ enabled
+ lists: List of list keys (from the Sliding Window API) to apply this
+ extension to.
+ rooms: List of room IDs (from the Room Subscription API) to apply this
+ extension to.
+ """
+
+ enabled: Optional[StrictBool] = False
+ # Process all lists defined in the Sliding Window API. (This is the default.)
+ lists: Optional[List[StrictStr]] = ["*"]
+ # Process all room subscriptions defined in the Room Subscription API. (This is the default.)
+ rooms: Optional[List[StrictStr]] = ["*"]
+
to_device: Optional[ToDeviceExtension] = None
e2ee: Optional[E2eeExtension] = None
+ account_data: Optional[AccountDataExtension] = None
+
+ conn_id: Optional[str]
# mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
if TYPE_CHECKING:
diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py
index 6c73f4ec33..5abf1041be 100644
--- a/tests/rest/client/test_sync.py
+++ b/tests/rest/client/test_sync.py
@@ -21,7 +21,7 @@
import json
import logging
from http import HTTPStatus
-from typing import Any, Dict, Iterable, List, Optional, Tuple
+from typing import Any, Dict, Iterable, List, Literal, Optional, Tuple
from parameterized import parameterized, parameterized_class
@@ -59,15 +59,16 @@ from synapse.types import (
StreamToken,
UserID,
)
+from synapse.types.handlers import SlidingSyncConfig
from synapse.util import Clock
+from synapse.util.stringutils import random_string
from tests import unittest
from tests.federation.transport.test_knocking import (
KnockingStrippedStateEventHelperMixin,
)
-from tests.server import FakeChannel, TimedOutException
+from tests.server import TimedOutException
from tests.test_utils.event_injection import create_event, mark_event_as_partial_state
-from tests.unittest import skip_unless
logger = logging.getLogger(__name__)
@@ -1238,6 +1239,12 @@ class SlidingSyncBase(unittest.HomeserverTestCase):
sync_endpoint = "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
+ def default_config(self) -> JsonDict:
+ config = super().default_config()
+ # Enable sliding sync
+ config["experimental_features"] = {"msc3575_enabled": True}
+ return config
+
def do_sync(
self, sync_body: JsonDict, *, since: Optional[str] = None, tok: str
) -> Tuple[JsonDict, str]:
@@ -1268,6 +1275,88 @@ class SlidingSyncBase(unittest.HomeserverTestCase):
return channel.json_body, channel.json_body["pos"]
+ def _bump_notifier_wait_for_events(
+ self,
+ user_id: str,
+ wake_stream_key: Literal[
+ StreamKeyType.ACCOUNT_DATA,
+ StreamKeyType.PRESENCE,
+ ],
+ ) -> None:
+ """
+ Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding
+ Sync results.
+
+ Args:
+ user_id: The user ID to wake up the notifier for
+ wake_stream_key: The stream key to wake up. This will create an actual new
+ entity in that stream so it's best to choose one that won't affect the
+ Sliding Sync results you're testing for. In other words, if your testing
+ account data, choose `StreamKeyType.PRESENCE` instead. We support two
+ possible stream keys because you're probably testing one or the other so
+ one is always a "safe" option.
+ """
+ # We're expecting some new activity from this point onwards
+ from_token = self.hs.get_event_sources().get_current_token()
+
+ triggered_notifier_wait_for_events = False
+
+ async def _on_new_acivity(
+ before_token: StreamToken, after_token: StreamToken
+ ) -> bool:
+ nonlocal triggered_notifier_wait_for_events
+ triggered_notifier_wait_for_events = True
+ return True
+
+ notifier = self.hs.get_notifier()
+
+ # Listen for some new activity for the user. We're just trying to confirm that
+ # our bump below actually does what we think it does (triggers new activity for
+ # the user).
+ result_awaitable = notifier.wait_for_events(
+ user_id,
+ 1000,
+ _on_new_acivity,
+ from_token=from_token,
+ )
+
+ # Update the account data or presence so that `notifier.wait_for_events(...)`
+ # wakes up. We chose these two options because they're least likely to show up
+ # in the Sliding Sync response so it won't affect whether we have results.
+ if wake_stream_key == StreamKeyType.ACCOUNT_DATA:
+ self.get_success(
+ self.hs.get_account_data_handler().add_account_data_for_user(
+ user_id,
+ "org.matrix.foobarbaz",
+ {"foo": "bar"},
+ )
+ )
+ elif wake_stream_key == StreamKeyType.PRESENCE:
+ sending_user_id = self.register_user(
+ "user_bump_notifier_wait_for_events_" + random_string(10), "pass"
+ )
+ sending_user_tok = self.login(sending_user_id, "pass")
+ test_msg = {"foo": "bar"}
+ chan = self.make_request(
+ "PUT",
+ "/_matrix/client/r0/sendToDevice/m.test/1234",
+ content={"messages": {user_id: {"d1": test_msg}}},
+ access_token=sending_user_tok,
+ )
+ self.assertEqual(chan.code, 200, chan.result)
+ else:
+ raise AssertionError(
+ "Unable to wake that stream in _bump_notifier_wait_for_events(...)"
+ )
+
+ # Wait for our notifier result
+ self.get_success(result_awaitable)
+
+ if not triggered_notifier_wait_for_events:
+ raise AssertionError(
+ "Expected `notifier.wait_for_events(...)` to be triggered"
+ )
+
class SlidingSyncTestCase(SlidingSyncBase):
"""
@@ -1282,18 +1371,10 @@ class SlidingSyncTestCase(SlidingSyncBase):
devices.register_servlets,
]
- def default_config(self) -> JsonDict:
- config = super().default_config()
- # Enable sliding sync
- config["experimental_features"] = {"msc3575_enabled": True}
- return config
-
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.event_sources = hs.get_event_sources()
self.storage_controllers = hs.get_storage_controllers()
- self.account_data_handler = hs.get_account_data_handler()
- self.notifier = hs.get_notifier()
def _assertRequiredStateIncludes(
self,
@@ -1419,94 +1500,41 @@ class SlidingSyncTestCase(SlidingSyncBase):
return room_id
- def _bump_notifier_wait_for_events(self, user_id: str) -> None:
- """
- Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding
- Sync results.
- """
- # We're expecting some new activity from this point onwards
- from_token = self.event_sources.get_current_token()
-
- triggered_notifier_wait_for_events = False
-
- async def _on_new_acivity(
- before_token: StreamToken, after_token: StreamToken
- ) -> bool:
- nonlocal triggered_notifier_wait_for_events
- triggered_notifier_wait_for_events = True
- return True
-
- # Listen for some new activity for the user. We're just trying to confirm that
- # our bump below actually does what we think it does (triggers new activity for
- # the user).
- result_awaitable = self.notifier.wait_for_events(
- user_id,
- 1000,
- _on_new_acivity,
- from_token=from_token,
- )
-
- # Update the account data so that `notifier.wait_for_events(...)` wakes up.
- # We're bumping account data because it won't show up in the Sliding Sync
- # response so it won't affect whether we have results.
- self.get_success(
- self.account_data_handler.add_account_data_for_user(
- user_id,
- "org.matrix.foobarbaz",
- {"foo": "bar"},
- )
- )
-
- # Wait for our notifier result
- self.get_success(result_awaitable)
-
- if not triggered_notifier_wait_for_events:
- raise AssertionError(
- "Expected `notifier.wait_for_events(...)` to be triggered"
- )
-
def test_sync_list(self) -> None:
"""
Test that room IDs show up in the Sliding Sync `lists`
"""
- alice_user_id = self.register_user("alice", "correcthorse")
- alice_access_token = self.login(alice_user_id, "correcthorse")
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
- room_id = self.helper.create_room_as(
- alice_user_id, tok=alice_access_token, is_public=True
- )
+ room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
# Make the Sliding Sync request
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 99]],
- "required_state": [
- ["m.room.join_rules", ""],
- ["m.room.history_visibility", ""],
- ["m.space.child", "*"],
- ],
- "timeline_limit": 1,
- }
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 99]],
+ "required_state": [
+ ["m.room.join_rules", ""],
+ ["m.room.history_visibility", ""],
+ ["m.space.child", "*"],
+ ],
+ "timeline_limit": 1,
}
- },
- access_token=alice_access_token,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Make sure it has the foo-list we requested
self.assertListEqual(
- list(channel.json_body["lists"].keys()),
+ list(response_body["lists"].keys()),
["foo-list"],
- channel.json_body["lists"].keys(),
+ response_body["lists"].keys(),
)
# Make sure the list includes the room we are joined to
self.assertListEqual(
- list(channel.json_body["lists"]["foo-list"]["ops"]),
+ list(response_body["lists"]["foo-list"]["ops"]),
[
{
"op": "SYNC",
@@ -1514,15 +1542,15 @@ class SlidingSyncTestCase(SlidingSyncBase):
"room_ids": [room_id],
}
],
- channel.json_body["lists"]["foo-list"],
+ response_body["lists"]["foo-list"],
)
def test_wait_for_sync_token(self) -> None:
"""
Test that worker will wait until it catches up to the given token
"""
- alice_user_id = self.register_user("alice", "correcthorse")
- alice_access_token = self.login(alice_user_id, "correcthorse")
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
# Create a future token that will cause us to wait. Since we never send a new
# event to reach that future stream_ordering, the worker will wait until the
@@ -1540,23 +1568,24 @@ class SlidingSyncTestCase(SlidingSyncBase):
)
# Make the Sliding Sync request
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 99]],
+ "required_state": [
+ ["m.room.join_rules", ""],
+ ["m.room.history_visibility", ""],
+ ["m.space.child", "*"],
+ ],
+ "timeline_limit": 1,
+ }
+ }
+ }
channel = self.make_request(
"POST",
self.sync_endpoint + f"?pos={future_position_token_serialized}",
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 99]],
- "required_state": [
- ["m.room.join_rules", ""],
- ["m.room.history_visibility", ""],
- ["m.space.child", "*"],
- ],
- "timeline_limit": 1,
- }
- }
- },
- access_token=alice_access_token,
+ content=sync_body,
+ access_token=user1_tok,
await_result=False,
)
# Block for 10 seconds to make `notifier.wait_for_stream_token(from_token)`
@@ -1626,12 +1655,6 @@ class SlidingSyncTestCase(SlidingSyncBase):
channel.json_body["rooms"][room_id]["timeline"],
)
- # TODO: Once we remove `ops`, we should be able to add a `RoomResult.__bool__` to
- # check if there are any updates since the `from_token`.
- @skip_unless(
- False,
- "Once we remove ops from the Sliding Sync response, this test should pass",
- )
def test_wait_for_new_data_timeout(self) -> None:
"""
Test to make sure that the Sliding Sync request waits for new data to arrive but
@@ -1646,23 +1669,22 @@ class SlidingSyncTestCase(SlidingSyncBase):
room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id, user1_id, tok=user1_tok)
- from_token = self.event_sources.get_current_token()
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 0]],
+ "required_state": [],
+ "timeline_limit": 1,
+ }
+ }
+ }
+ _, from_token = self.do_sync(sync_body, tok=user1_tok)
# Make the Sliding Sync request
channel = self.make_request(
"POST",
- self.sync_endpoint
- + "?timeout=10000"
- + f"&pos={self.get_success(from_token.to_string(self.store))}",
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 0]],
- "required_state": [],
- "timeline_limit": 1,
- }
- }
- },
+ self.sync_endpoint + f"?timeout=10000&pos={from_token}",
+ content=sync_body,
access_token=user1_tok,
await_result=False,
)
@@ -1671,7 +1693,9 @@ class SlidingSyncTestCase(SlidingSyncBase):
channel.await_result(timeout_ms=5000)
# Wake-up `notifier.wait_for_events(...)` that will cause us test
# `SlidingSyncResult.__bool__` for new results.
- self._bump_notifier_wait_for_events(user1_id)
+ self._bump_notifier_wait_for_events(
+ user1_id, wake_stream_key=StreamKeyType.ACCOUNT_DATA
+ )
# Block for a little bit more to ensure we don't see any new results.
with self.assertRaises(TimedOutException):
channel.await_result(timeout_ms=4000)
@@ -1680,12 +1704,8 @@ class SlidingSyncTestCase(SlidingSyncBase):
channel.await_result(timeout_ms=1200)
self.assertEqual(channel.code, 200, channel.json_body)
- # We still see rooms because that's how Sliding Sync lists work but we reached
- # the timeout before seeing them
- self.assertEqual(
- [event["event_id"] for event in channel.json_body["rooms"].keys()],
- [room_id],
- )
+ # There should be no room sent down.
+ self.assertFalse(channel.json_body["rooms"])
def test_filter_list(self) -> None:
"""
@@ -1721,55 +1741,50 @@ class SlidingSyncTestCase(SlidingSyncBase):
self.helper.invite(invite_room_id, src=user2_id, targ=user1_id, tok=user2_tok)
# Make the Sliding Sync request
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- # Absense of filters does not imply "False" values
- "all": {
- "ranges": [[0, 99]],
- "required_state": [],
- "timeline_limit": 1,
- "filters": {},
- },
- # Test single truthy filter
- "dms": {
- "ranges": [[0, 99]],
- "required_state": [],
- "timeline_limit": 1,
- "filters": {"is_dm": True},
- },
- # Test single falsy filter
- "non-dms": {
- "ranges": [[0, 99]],
- "required_state": [],
- "timeline_limit": 1,
- "filters": {"is_dm": False},
- },
- # Test how multiple filters should stack (AND'd together)
- "room-invites": {
- "ranges": [[0, 99]],
- "required_state": [],
- "timeline_limit": 1,
- "filters": {"is_dm": False, "is_invite": True},
- },
- }
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ sync_body = {
+ "lists": {
+ # Absense of filters does not imply "False" values
+ "all": {
+ "ranges": [[0, 99]],
+ "required_state": [],
+ "timeline_limit": 1,
+ "filters": {},
+ },
+ # Test single truthy filter
+ "dms": {
+ "ranges": [[0, 99]],
+ "required_state": [],
+ "timeline_limit": 1,
+ "filters": {"is_dm": True},
+ },
+ # Test single falsy filter
+ "non-dms": {
+ "ranges": [[0, 99]],
+ "required_state": [],
+ "timeline_limit": 1,
+ "filters": {"is_dm": False},
+ },
+ # Test how multiple filters should stack (AND'd together)
+ "room-invites": {
+ "ranges": [[0, 99]],
+ "required_state": [],
+ "timeline_limit": 1,
+ "filters": {"is_dm": False, "is_invite": True},
+ },
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Make sure it has the foo-list we requested
self.assertListEqual(
- list(channel.json_body["lists"].keys()),
+ list(response_body["lists"].keys()),
["all", "dms", "non-dms", "room-invites"],
- channel.json_body["lists"].keys(),
+ response_body["lists"].keys(),
)
# Make sure the lists have the correct rooms
self.assertListEqual(
- list(channel.json_body["lists"]["all"]["ops"]),
+ list(response_body["lists"]["all"]["ops"]),
[
{
"op": "SYNC",
@@ -1782,10 +1797,10 @@ class SlidingSyncTestCase(SlidingSyncBase):
],
}
],
- list(channel.json_body["lists"]["all"]),
+ list(response_body["lists"]["all"]),
)
self.assertListEqual(
- list(channel.json_body["lists"]["dms"]["ops"]),
+ list(response_body["lists"]["dms"]["ops"]),
[
{
"op": "SYNC",
@@ -1793,10 +1808,10 @@ class SlidingSyncTestCase(SlidingSyncBase):
"room_ids": [invited_dm_room_id, joined_dm_room_id],
}
],
- list(channel.json_body["lists"]["dms"]),
+ list(response_body["lists"]["dms"]),
)
self.assertListEqual(
- list(channel.json_body["lists"]["non-dms"]["ops"]),
+ list(response_body["lists"]["non-dms"]["ops"]),
[
{
"op": "SYNC",
@@ -1804,10 +1819,10 @@ class SlidingSyncTestCase(SlidingSyncBase):
"room_ids": [invite_room_id, room_id],
}
],
- list(channel.json_body["lists"]["non-dms"]),
+ list(response_body["lists"]["non-dms"]),
)
self.assertListEqual(
- list(channel.json_body["lists"]["room-invites"]["ops"]),
+ list(response_body["lists"]["room-invites"]["ops"]),
[
{
"op": "SYNC",
@@ -1815,14 +1830,14 @@ class SlidingSyncTestCase(SlidingSyncBase):
"room_ids": [invite_room_id],
}
],
- list(channel.json_body["lists"]["room-invites"]),
+ list(response_body["lists"]["room-invites"]),
)
# Ensure DM's are correctly marked
self.assertDictEqual(
{
room_id: room.get("is_dm")
- for room_id, room in channel.json_body["rooms"].items()
+ for room_id, room in response_body["rooms"].items()
},
{
invite_room_id: None,
@@ -1849,36 +1864,31 @@ class SlidingSyncTestCase(SlidingSyncBase):
self.helper.send(room_id2, "activity in room2", tok=user1_tok)
# Make the Sliding Sync request
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 99]],
- "required_state": [
- ["m.room.join_rules", ""],
- ["m.room.history_visibility", ""],
- ["m.space.child", "*"],
- ],
- "timeline_limit": 1,
- }
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 99]],
+ "required_state": [
+ ["m.room.join_rules", ""],
+ ["m.room.history_visibility", ""],
+ ["m.space.child", "*"],
+ ],
+ "timeline_limit": 1,
}
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Make sure it has the foo-list we requested
self.assertListEqual(
- list(channel.json_body["lists"].keys()),
+ list(response_body["lists"].keys()),
["foo-list"],
- channel.json_body["lists"].keys(),
+ response_body["lists"].keys(),
)
# Make sure the list is sorted in the way we expect
self.assertListEqual(
- list(channel.json_body["lists"]["foo-list"]["ops"]),
+ list(response_body["lists"]["foo-list"]["ops"]),
[
{
"op": "SYNC",
@@ -1886,7 +1896,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
"room_ids": [room_id2, room_id1, room_id3],
}
],
- channel.json_body["lists"]["foo-list"],
+ response_body["lists"]["foo-list"],
)
def test_sliced_windows(self) -> None:
@@ -1902,35 +1912,26 @@ class SlidingSyncTestCase(SlidingSyncBase):
room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
# Make the Sliding Sync request for a single room
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 0]],
- "required_state": [
- ["m.room.join_rules", ""],
- ["m.room.history_visibility", ""],
- ["m.space.child", "*"],
- ],
- "timeline_limit": 1,
- }
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 0]],
+ "required_state": [],
+ "timeline_limit": 1,
}
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Make sure it has the foo-list we requested
self.assertListEqual(
- list(channel.json_body["lists"].keys()),
+ list(response_body["lists"].keys()),
["foo-list"],
- channel.json_body["lists"].keys(),
+ response_body["lists"].keys(),
)
# Make sure the list is sorted in the way we expect
self.assertListEqual(
- list(channel.json_body["lists"]["foo-list"]["ops"]),
+ list(response_body["lists"]["foo-list"]["ops"]),
[
{
"op": "SYNC",
@@ -1938,39 +1939,30 @@ class SlidingSyncTestCase(SlidingSyncBase):
"room_ids": [room_id3],
}
],
- channel.json_body["lists"]["foo-list"],
+ response_body["lists"]["foo-list"],
)
# Make the Sliding Sync request for the first two rooms
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 1]],
- "required_state": [
- ["m.room.join_rules", ""],
- ["m.room.history_visibility", ""],
- ["m.space.child", "*"],
- ],
- "timeline_limit": 1,
- }
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 1,
}
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Make sure it has the foo-list we requested
self.assertListEqual(
- list(channel.json_body["lists"].keys()),
+ list(response_body["lists"].keys()),
["foo-list"],
- channel.json_body["lists"].keys(),
+ response_body["lists"].keys(),
)
# Make sure the list is sorted in the way we expect
self.assertListEqual(
- list(channel.json_body["lists"]["foo-list"]["ops"]),
+ list(response_body["lists"]["foo-list"]["ops"]),
[
{
"op": "SYNC",
@@ -1978,7 +1970,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
"room_ids": [room_id3, room_id2],
}
],
- channel.json_body["lists"]["foo-list"],
+ response_body["lists"]["foo-list"],
)
def test_rooms_meta_when_joined(self) -> None:
@@ -2009,43 +2001,38 @@ class SlidingSyncTestCase(SlidingSyncBase):
self.helper.join(room_id1, user1_id, tok=user1_tok)
# Make the Sliding Sync request
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 1]],
- "required_state": [],
- "timeline_limit": 0,
- }
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 0,
}
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Reflect the current state of the room
self.assertEqual(
- channel.json_body["rooms"][room_id1]["name"],
+ response_body["rooms"][room_id1]["name"],
"my super room",
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1],
)
self.assertEqual(
- channel.json_body["rooms"][room_id1]["avatar"],
+ response_body["rooms"][room_id1]["avatar"],
"mxc://DUMMY_MEDIA_ID",
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1],
)
self.assertEqual(
- channel.json_body["rooms"][room_id1]["joined_count"],
+ response_body["rooms"][room_id1]["joined_count"],
2,
)
self.assertEqual(
- channel.json_body["rooms"][room_id1]["invited_count"],
+ response_body["rooms"][room_id1]["invited_count"],
0,
)
self.assertIsNone(
- channel.json_body["rooms"][room_id1].get("is_dm"),
+ response_body["rooms"][room_id1].get("is_dm"),
)
def test_rooms_meta_when_invited(self) -> None:
@@ -2092,44 +2079,39 @@ class SlidingSyncTestCase(SlidingSyncBase):
)
# Make the Sliding Sync request
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 1]],
- "required_state": [],
- "timeline_limit": 0,
- }
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 0,
}
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# This should still reflect the current state of the room even when the user is
# invited.
self.assertEqual(
- channel.json_body["rooms"][room_id1]["name"],
+ response_body["rooms"][room_id1]["name"],
"my super duper room",
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1],
)
self.assertEqual(
- channel.json_body["rooms"][room_id1]["avatar"],
+ response_body["rooms"][room_id1]["avatar"],
"mxc://UPDATED_DUMMY_MEDIA_ID",
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1],
)
self.assertEqual(
- channel.json_body["rooms"][room_id1]["joined_count"],
+ response_body["rooms"][room_id1]["joined_count"],
1,
)
self.assertEqual(
- channel.json_body["rooms"][room_id1]["invited_count"],
+ response_body["rooms"][room_id1]["invited_count"],
1,
)
self.assertIsNone(
- channel.json_body["rooms"][room_id1].get("is_dm"),
+ response_body["rooms"][room_id1].get("is_dm"),
)
def test_rooms_meta_when_banned(self) -> None:
@@ -2176,45 +2158,40 @@ class SlidingSyncTestCase(SlidingSyncBase):
)
# Make the Sliding Sync request
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 1]],
- "required_state": [],
- "timeline_limit": 0,
- }
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 0,
}
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Reflect the state of the room at the time of leaving
self.assertEqual(
- channel.json_body["rooms"][room_id1]["name"],
+ response_body["rooms"][room_id1]["name"],
"my super room",
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1],
)
self.assertEqual(
- channel.json_body["rooms"][room_id1]["avatar"],
+ response_body["rooms"][room_id1]["avatar"],
"mxc://DUMMY_MEDIA_ID",
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1],
)
self.assertEqual(
- channel.json_body["rooms"][room_id1]["joined_count"],
+ response_body["rooms"][room_id1]["joined_count"],
# FIXME: The actual number should be "1" (user2) but we currently don't
# support this for rooms where the user has left/been banned.
0,
)
self.assertEqual(
- channel.json_body["rooms"][room_id1]["invited_count"],
+ response_body["rooms"][room_id1]["invited_count"],
0,
)
self.assertIsNone(
- channel.json_body["rooms"][room_id1].get("is_dm"),
+ response_body["rooms"][room_id1].get("is_dm"),
)
def test_rooms_meta_heroes(self) -> None:
@@ -2254,61 +2231,56 @@ class SlidingSyncTestCase(SlidingSyncBase):
self.helper.invite(room_id2, src=user2_id, targ=user3_id, tok=user2_tok)
# Make the Sliding Sync request
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 1]],
- "required_state": [],
- "timeline_limit": 0,
- }
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 0,
}
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Room1 has a name so we shouldn't see any `heroes` which the client would use
# the calculate the room name themselves.
self.assertEqual(
- channel.json_body["rooms"][room_id1]["name"],
+ response_body["rooms"][room_id1]["name"],
"my super room",
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1],
)
- self.assertIsNone(channel.json_body["rooms"][room_id1].get("heroes"))
+ self.assertIsNone(response_body["rooms"][room_id1].get("heroes"))
self.assertEqual(
- channel.json_body["rooms"][room_id1]["joined_count"],
+ response_body["rooms"][room_id1]["joined_count"],
2,
)
self.assertEqual(
- channel.json_body["rooms"][room_id1]["invited_count"],
+ response_body["rooms"][room_id1]["invited_count"],
1,
)
# Room2 doesn't have a name so we should see `heroes` populated
- self.assertIsNone(channel.json_body["rooms"][room_id2].get("name"))
+ self.assertIsNone(response_body["rooms"][room_id2].get("name"))
self.assertCountEqual(
[
hero["user_id"]
- for hero in channel.json_body["rooms"][room_id2].get("heroes", [])
+ for hero in response_body["rooms"][room_id2].get("heroes", [])
],
# Heroes shouldn't include the user themselves (we shouldn't see user1)
[user2_id, user3_id],
)
self.assertEqual(
- channel.json_body["rooms"][room_id2]["joined_count"],
+ response_body["rooms"][room_id2]["joined_count"],
2,
)
self.assertEqual(
- channel.json_body["rooms"][room_id2]["invited_count"],
+ response_body["rooms"][room_id2]["invited_count"],
1,
)
# We didn't request any state so we shouldn't see any `required_state`
- self.assertIsNone(channel.json_body["rooms"][room_id1].get("required_state"))
- self.assertIsNone(channel.json_body["rooms"][room_id2].get("required_state"))
+ self.assertIsNone(response_body["rooms"][room_id1].get("required_state"))
+ self.assertIsNone(response_body["rooms"][room_id2].get("required_state"))
def test_rooms_meta_heroes_max(self) -> None:
"""
@@ -2347,44 +2319,39 @@ class SlidingSyncTestCase(SlidingSyncBase):
self.helper.join(room_id1, user7_id, tok=user7_tok)
# Make the Sliding Sync request
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 1]],
- "required_state": [],
- "timeline_limit": 0,
- }
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 0,
}
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Room2 doesn't have a name so we should see `heroes` populated
- self.assertIsNone(channel.json_body["rooms"][room_id1].get("name"))
+ self.assertIsNone(response_body["rooms"][room_id1].get("name"))
self.assertCountEqual(
[
hero["user_id"]
- for hero in channel.json_body["rooms"][room_id1].get("heroes", [])
+ for hero in response_body["rooms"][room_id1].get("heroes", [])
],
# Heroes should be the first 5 users in the room (excluding the user
# themselves, we shouldn't see `user1`)
[user2_id, user3_id, user4_id, user5_id, user6_id],
)
self.assertEqual(
- channel.json_body["rooms"][room_id1]["joined_count"],
+ response_body["rooms"][room_id1]["joined_count"],
7,
)
self.assertEqual(
- channel.json_body["rooms"][room_id1]["invited_count"],
+ response_body["rooms"][room_id1]["invited_count"],
0,
)
# We didn't request any state so we shouldn't see any `required_state`
- self.assertIsNone(channel.json_body["rooms"][room_id1].get("required_state"))
+ self.assertIsNone(response_body["rooms"][room_id1].get("required_state"))
def test_rooms_meta_heroes_when_banned(self) -> None:
"""
@@ -2425,28 +2392,23 @@ class SlidingSyncTestCase(SlidingSyncBase):
self.helper.invite(room_id1, src=user2_id, targ=user5_id, tok=user2_tok)
# Make the Sliding Sync request
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 1]],
- "required_state": [],
- "timeline_limit": 0,
- }
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 0,
}
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Room2 doesn't have a name so we should see `heroes` populated
- self.assertIsNone(channel.json_body["rooms"][room_id1].get("name"))
+ self.assertIsNone(response_body["rooms"][room_id1].get("name"))
self.assertCountEqual(
[
hero["user_id"]
- for hero in channel.json_body["rooms"][room_id1].get("heroes", [])
+ for hero in response_body["rooms"][room_id1].get("heroes", [])
],
# Heroes shouldn't include the user themselves (we shouldn't see user1). We
# also shouldn't see user4 since they joined after user1 was banned.
@@ -2457,13 +2419,13 @@ class SlidingSyncTestCase(SlidingSyncBase):
)
self.assertEqual(
- channel.json_body["rooms"][room_id1]["joined_count"],
+ response_body["rooms"][room_id1]["joined_count"],
# FIXME: The actual number should be "1" (user2) but we currently don't
# support this for rooms where the user has left/been banned.
0,
)
self.assertEqual(
- channel.json_body["rooms"][room_id1]["invited_count"],
+ response_body["rooms"][room_id1]["invited_count"],
# We shouldn't see user5 since they were invited after user1 was banned.
#
# FIXME: The actual number should be "1" (user3) but we currently don't
@@ -2496,46 +2458,41 @@ class SlidingSyncTestCase(SlidingSyncBase):
user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
# Make the Sliding Sync request
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 1]],
- "required_state": [],
- "timeline_limit": 3,
- }
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 3,
}
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# We expect to saturate the `timeline_limit` (there are more than 3 messages in the room)
self.assertEqual(
- channel.json_body["rooms"][room_id1]["limited"],
+ response_body["rooms"][room_id1]["limited"],
True,
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1],
)
# Check to make sure the latest events are returned
self.assertEqual(
[
event["event_id"]
- for event in channel.json_body["rooms"][room_id1]["timeline"]
+ for event in response_body["rooms"][room_id1]["timeline"]
],
[
event_response4["event_id"],
event_response5["event_id"],
user1_join_response["event_id"],
],
- channel.json_body["rooms"][room_id1]["timeline"],
+ response_body["rooms"][room_id1]["timeline"],
)
# Check to make sure the `prev_batch` points at the right place
prev_batch_token = self.get_success(
StreamToken.from_string(
- self.store, channel.json_body["rooms"][room_id1]["prev_batch"]
+ self.store, response_body["rooms"][room_id1]["prev_batch"]
)
)
prev_batch_room_stream_token_serialized = self.get_success(
@@ -2559,9 +2516,9 @@ class SlidingSyncTestCase(SlidingSyncBase):
# With no `from_token` (initial sync), it's all historical since there is no
# "live" range
self.assertEqual(
- channel.json_body["rooms"][room_id1]["num_live"],
+ response_body["rooms"][room_id1]["num_live"],
0,
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1],
)
def test_rooms_not_limited_initial_sync(self) -> None:
@@ -2582,44 +2539,39 @@ class SlidingSyncTestCase(SlidingSyncBase):
# Make the Sliding Sync request
timeline_limit = 100
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 1]],
- "required_state": [],
- "timeline_limit": timeline_limit,
- }
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": timeline_limit,
}
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# The timeline should be `limited=False` because we have all of the events (no
# more to paginate to)
self.assertEqual(
- channel.json_body["rooms"][room_id1]["limited"],
+ response_body["rooms"][room_id1]["limited"],
False,
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1],
)
expected_number_of_events = 9
# We're just looking to make sure we got all of the events before hitting the `timeline_limit`
self.assertEqual(
- len(channel.json_body["rooms"][room_id1]["timeline"]),
+ len(response_body["rooms"][room_id1]["timeline"]),
expected_number_of_events,
- channel.json_body["rooms"][room_id1]["timeline"],
+ response_body["rooms"][room_id1]["timeline"],
)
self.assertLessEqual(expected_number_of_events, timeline_limit)
# With no `from_token` (initial sync), it's all historical since there is no
# "live" token range.
self.assertEqual(
- channel.json_body["rooms"][room_id1]["num_live"],
+ response_body["rooms"][room_id1]["num_live"],
0,
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1],
)
def test_rooms_incremental_sync(self) -> None:
@@ -2637,7 +2589,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
# Make an initial Sliding Sync request to grab a token. This is also a sanity
# check that we can go from initial to incremental sync.
- sync_params = {
+ sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
@@ -2646,14 +2598,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
}
}
}
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- sync_params,
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
- next_pos = channel.json_body["pos"]
+ _, from_token = self.do_sync(sync_body, tok=user1_tok)
# Send some events but don't send enough to saturate the `timeline_limit`.
# We want to later test that we only get the new events since the `next_pos`
@@ -2661,41 +2606,35 @@ class SlidingSyncTestCase(SlidingSyncBase):
event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok)
# Make an incremental Sliding Sync request (what we're trying to test)
- channel = self.make_request(
- "POST",
- self.sync_endpoint + f"?pos={next_pos}",
- sync_params,
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
# We only expect to see the new events since the last sync which isn't enough to
# fill up the `timeline_limit`.
self.assertEqual(
- channel.json_body["rooms"][room_id1]["limited"],
+ response_body["rooms"][room_id1]["limited"],
False,
- f'Our `timeline_limit` was {sync_params["lists"]["foo-list"]["timeline_limit"]} '
- + f'and {len(channel.json_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. '
- + str(channel.json_body["rooms"][room_id1]),
+ f'Our `timeline_limit` was {sync_body["lists"]["foo-list"]["timeline_limit"]} '
+ + f'and {len(response_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. '
+ + str(response_body["rooms"][room_id1]),
)
# Check to make sure the latest events are returned
self.assertEqual(
[
event["event_id"]
- for event in channel.json_body["rooms"][room_id1]["timeline"]
+ for event in response_body["rooms"][room_id1]["timeline"]
],
[
event_response2["event_id"],
event_response3["event_id"],
],
- channel.json_body["rooms"][room_id1]["timeline"],
+ response_body["rooms"][room_id1]["timeline"],
)
# All events are "live"
self.assertEqual(
- channel.json_body["rooms"][room_id1]["num_live"],
+ response_body["rooms"][room_id1]["num_live"],
2,
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1],
)
def test_rooms_bump_stamp(self) -> None:
@@ -2740,33 +2679,27 @@ class SlidingSyncTestCase(SlidingSyncBase):
)
# Make the Sliding Sync request
- timeline_limit = 100
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 1]],
- "required_state": [],
- "timeline_limit": timeline_limit,
- }
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 100,
}
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Make sure it has the foo-list we requested
self.assertListEqual(
- list(channel.json_body["lists"].keys()),
+ list(response_body["lists"].keys()),
["foo-list"],
- channel.json_body["lists"].keys(),
+ response_body["lists"].keys(),
)
# Make sure the list includes the rooms in the right order
self.assertListEqual(
- list(channel.json_body["lists"]["foo-list"]["ops"]),
+ list(response_body["lists"]["foo-list"]["ops"]),
[
{
"op": "SYNC",
@@ -2776,22 +2709,22 @@ class SlidingSyncTestCase(SlidingSyncBase):
"room_ids": [room_id1, room_id2],
}
],
- channel.json_body["lists"]["foo-list"],
+ response_body["lists"]["foo-list"],
)
# The `bump_stamp` for room1 should point at the latest message (not the
# reaction since it's not one of the `DEFAULT_BUMP_EVENT_TYPES`)
self.assertEqual(
- channel.json_body["rooms"][room_id1]["bump_stamp"],
+ response_body["rooms"][room_id1]["bump_stamp"],
event_pos1.stream,
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1],
)
# The `bump_stamp` for room2 should point at the latest message
self.assertEqual(
- channel.json_body["rooms"][room_id2]["bump_stamp"],
+ response_body["rooms"][room_id2]["bump_stamp"],
event_pos2.stream,
- channel.json_body["rooms"][room_id2],
+ response_body["rooms"][room_id2],
)
def test_rooms_bump_stamp_backfill(self) -> None:
@@ -2895,23 +2828,18 @@ class SlidingSyncTestCase(SlidingSyncBase):
# Doing an SS request should return a positive `bump_stamp`, even though
# the only event that matches the bump types has as negative stream
# ordering.
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- content={
- "lists": {
- "foo-list": {
- "ranges": [[0, 1]],
- "required_state": [],
- "timeline_limit": 5,
- }
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 5,
}
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
- self.assertGreater(channel.json_body["rooms"][room_id]["bump_stamp"], 0)
+ self.assertGreater(response_body["rooms"][room_id]["bump_stamp"], 0)
def test_rooms_newly_joined_incremental_sync(self) -> None:
"""
@@ -2958,28 +2886,22 @@ class SlidingSyncTestCase(SlidingSyncBase):
)
# Make an incremental Sliding Sync request (what we're trying to test)
- channel = self.make_request(
- "POST",
- self.sync_endpoint + f"?pos={from_token}",
- content=sync_body,
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
# We should see the new events and the rest should be filled with historical
# events which will make us `limited=True` since there are more to paginate to.
self.assertEqual(
- channel.json_body["rooms"][room_id1]["limited"],
+ response_body["rooms"][room_id1]["limited"],
True,
f"Our `timeline_limit` was {timeline_limit} "
- + f'and {len(channel.json_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. '
- + str(channel.json_body["rooms"][room_id1]),
+ + f'and {len(response_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. '
+ + str(response_body["rooms"][room_id1]),
)
# Check to make sure that the "live" and historical events are returned
self.assertEqual(
[
event["event_id"]
- for event in channel.json_body["rooms"][room_id1]["timeline"]
+ for event in response_body["rooms"][room_id1]["timeline"]
],
[
event_response2["event_id"],
@@ -2987,14 +2909,14 @@ class SlidingSyncTestCase(SlidingSyncBase):
event_response3["event_id"],
event_response4["event_id"],
],
- channel.json_body["rooms"][room_id1]["timeline"],
+ response_body["rooms"][room_id1]["timeline"],
)
# Only events after the `from_token` are "live" (join, event3, event4)
self.assertEqual(
- channel.json_body["rooms"][room_id1]["num_live"],
+ response_body["rooms"][room_id1]["num_live"],
3,
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1],
)
def test_rooms_invite_shared_history_initial_sync(self) -> None:
@@ -3031,51 +2953,46 @@ class SlidingSyncTestCase(SlidingSyncBase):
self.helper.send(room_id1, "activity after4", tok=user2_tok)
# Make the Sliding Sync request
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 1]],
- "required_state": [],
- "timeline_limit": 3,
- }
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 3,
}
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# `timeline` is omitted for `invite` rooms with `stripped_state`
self.assertIsNone(
- channel.json_body["rooms"][room_id1].get("timeline"),
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1].get("timeline"),
+ response_body["rooms"][room_id1],
)
# `num_live` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
self.assertIsNone(
- channel.json_body["rooms"][room_id1].get("num_live"),
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1].get("num_live"),
+ response_body["rooms"][room_id1],
)
# `limited` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
self.assertIsNone(
- channel.json_body["rooms"][room_id1].get("limited"),
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1].get("limited"),
+ response_body["rooms"][room_id1],
)
# `prev_batch` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
self.assertIsNone(
- channel.json_body["rooms"][room_id1].get("prev_batch"),
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1].get("prev_batch"),
+ response_body["rooms"][room_id1],
)
# `required_state` is omitted for `invite` rooms with `stripped_state`
self.assertIsNone(
- channel.json_body["rooms"][room_id1].get("required_state"),
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1].get("required_state"),
+ response_body["rooms"][room_id1],
)
# We should have some `stripped_state` so the potential joiner can identify the
# room (we don't care about the order).
self.assertCountEqual(
- channel.json_body["rooms"][room_id1]["invite_state"],
+ response_body["rooms"][room_id1]["invite_state"],
[
{
"content": {"creator": user2_id, "room_version": "10"},
@@ -3102,7 +3019,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
"type": "m.room.member",
},
],
- channel.json_body["rooms"][room_id1]["invite_state"],
+ response_body["rooms"][room_id1]["invite_state"],
)
def test_rooms_invite_shared_history_incremental_sync(self) -> None:
@@ -3153,43 +3070,39 @@ class SlidingSyncTestCase(SlidingSyncBase):
self.helper.send(room_id1, "activity after toekn6", tok=user2_tok)
# Make the Sliding Sync request
- channel = self.make_request(
- "POST",
- self.sync_endpoint + f"?pos={from_token}",
- content=sync_body,
- access_token=user1_tok,
+ response_body, from_token = self.do_sync(
+ sync_body, since=from_token, tok=user1_tok
)
- self.assertEqual(channel.code, 200, channel.json_body)
# `timeline` is omitted for `invite` rooms with `stripped_state`
self.assertIsNone(
- channel.json_body["rooms"][room_id1].get("timeline"),
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1].get("timeline"),
+ response_body["rooms"][room_id1],
)
# `num_live` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
self.assertIsNone(
- channel.json_body["rooms"][room_id1].get("num_live"),
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1].get("num_live"),
+ response_body["rooms"][room_id1],
)
# `limited` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
self.assertIsNone(
- channel.json_body["rooms"][room_id1].get("limited"),
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1].get("limited"),
+ response_body["rooms"][room_id1],
)
# `prev_batch` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
self.assertIsNone(
- channel.json_body["rooms"][room_id1].get("prev_batch"),
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1].get("prev_batch"),
+ response_body["rooms"][room_id1],
)
# `required_state` is omitted for `invite` rooms with `stripped_state`
self.assertIsNone(
- channel.json_body["rooms"][room_id1].get("required_state"),
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1].get("required_state"),
+ response_body["rooms"][room_id1],
)
# We should have some `stripped_state` so the potential joiner can identify the
# room (we don't care about the order).
self.assertCountEqual(
- channel.json_body["rooms"][room_id1]["invite_state"],
+ response_body["rooms"][room_id1]["invite_state"],
[
{
"content": {"creator": user2_id, "room_version": "10"},
@@ -3216,7 +3129,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
"type": "m.room.member",
},
],
- channel.json_body["rooms"][room_id1]["invite_state"],
+ response_body["rooms"][room_id1]["invite_state"],
)
def test_rooms_invite_world_readable_history_initial_sync(self) -> None:
@@ -3270,52 +3183,47 @@ class SlidingSyncTestCase(SlidingSyncBase):
self.helper.send(room_id1, "activity after4", tok=user2_tok)
# Make the Sliding Sync request
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 1]],
- "required_state": [],
- # Large enough to see the latest events and before the invite
- "timeline_limit": 4,
- }
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ # Large enough to see the latest events and before the invite
+ "timeline_limit": 4,
}
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# `timeline` is omitted for `invite` rooms with `stripped_state`
self.assertIsNone(
- channel.json_body["rooms"][room_id1].get("timeline"),
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1].get("timeline"),
+ response_body["rooms"][room_id1],
)
# `num_live` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
self.assertIsNone(
- channel.json_body["rooms"][room_id1].get("num_live"),
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1].get("num_live"),
+ response_body["rooms"][room_id1],
)
# `limited` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
self.assertIsNone(
- channel.json_body["rooms"][room_id1].get("limited"),
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1].get("limited"),
+ response_body["rooms"][room_id1],
)
# `prev_batch` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
self.assertIsNone(
- channel.json_body["rooms"][room_id1].get("prev_batch"),
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1].get("prev_batch"),
+ response_body["rooms"][room_id1],
)
# `required_state` is omitted for `invite` rooms with `stripped_state`
self.assertIsNone(
- channel.json_body["rooms"][room_id1].get("required_state"),
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1].get("required_state"),
+ response_body["rooms"][room_id1],
)
# We should have some `stripped_state` so the potential joiner can identify the
# room (we don't care about the order).
self.assertCountEqual(
- channel.json_body["rooms"][room_id1]["invite_state"],
+ response_body["rooms"][room_id1]["invite_state"],
[
{
"content": {"creator": user2_id, "room_version": "10"},
@@ -3342,7 +3250,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
"type": "m.room.member",
},
],
- channel.json_body["rooms"][room_id1]["invite_state"],
+ response_body["rooms"][room_id1]["invite_state"],
)
def test_rooms_invite_world_readable_history_incremental_sync(self) -> None:
@@ -3410,44 +3318,38 @@ class SlidingSyncTestCase(SlidingSyncBase):
self.helper.send(room_id1, "activity after token5", tok=user2_tok)
self.helper.send(room_id1, "activity after toekn6", tok=user2_tok)
- # Make the Sliding Sync request
- channel = self.make_request(
- "POST",
- self.sync_endpoint + f"?pos={from_token}",
- content=sync_body,
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ # Make the incremental Sliding Sync request
+ response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
# `timeline` is omitted for `invite` rooms with `stripped_state`
self.assertIsNone(
- channel.json_body["rooms"][room_id1].get("timeline"),
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1].get("timeline"),
+ response_body["rooms"][room_id1],
)
# `num_live` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
self.assertIsNone(
- channel.json_body["rooms"][room_id1].get("num_live"),
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1].get("num_live"),
+ response_body["rooms"][room_id1],
)
# `limited` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
self.assertIsNone(
- channel.json_body["rooms"][room_id1].get("limited"),
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1].get("limited"),
+ response_body["rooms"][room_id1],
)
# `prev_batch` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
self.assertIsNone(
- channel.json_body["rooms"][room_id1].get("prev_batch"),
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1].get("prev_batch"),
+ response_body["rooms"][room_id1],
)
# `required_state` is omitted for `invite` rooms with `stripped_state`
self.assertIsNone(
- channel.json_body["rooms"][room_id1].get("required_state"),
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1].get("required_state"),
+ response_body["rooms"][room_id1],
)
# We should have some `stripped_state` so the potential joiner can identify the
# room (we don't care about the order).
self.assertCountEqual(
- channel.json_body["rooms"][room_id1]["invite_state"],
+ response_body["rooms"][room_id1]["invite_state"],
[
{
"content": {"creator": user2_id, "room_version": "10"},
@@ -3474,7 +3376,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
"type": "m.room.member",
},
],
- channel.json_body["rooms"][room_id1]["invite_state"],
+ response_body["rooms"][room_id1]["invite_state"],
)
def test_rooms_ban_initial_sync(self) -> None:
@@ -3502,47 +3404,42 @@ class SlidingSyncTestCase(SlidingSyncBase):
self.helper.send(room_id1, "activity after6", tok=user2_tok)
# Make the Sliding Sync request
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 1]],
- "required_state": [],
- "timeline_limit": 3,
- }
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 3,
}
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# We should see events before the ban but not after
self.assertEqual(
[
event["event_id"]
- for event in channel.json_body["rooms"][room_id1]["timeline"]
+ for event in response_body["rooms"][room_id1]["timeline"]
],
[
event_response3["event_id"],
event_response4["event_id"],
user1_ban_response["event_id"],
],
- channel.json_body["rooms"][room_id1]["timeline"],
+ response_body["rooms"][room_id1]["timeline"],
)
# No "live" events in an initial sync (no `from_token` to define the "live"
# range)
self.assertEqual(
- channel.json_body["rooms"][room_id1]["num_live"],
+ response_body["rooms"][room_id1]["num_live"],
0,
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1],
)
# There are more events to paginate to
self.assertEqual(
- channel.json_body["rooms"][room_id1]["limited"],
+ response_body["rooms"][room_id1]["limited"],
True,
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1],
)
def test_rooms_ban_incremental_sync1(self) -> None:
@@ -3582,39 +3479,33 @@ class SlidingSyncTestCase(SlidingSyncBase):
self.helper.send(room_id1, "activity after5", tok=user2_tok)
self.helper.send(room_id1, "activity after6", tok=user2_tok)
- # Make the Sliding Sync request
- channel = self.make_request(
- "POST",
- self.sync_endpoint + f"?pos={from_token}",
- content=sync_body,
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ # Make the incremental Sliding Sync request
+ response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
# We should see events before the ban but not after
self.assertEqual(
[
event["event_id"]
- for event in channel.json_body["rooms"][room_id1]["timeline"]
+ for event in response_body["rooms"][room_id1]["timeline"]
],
[
event_response3["event_id"],
event_response4["event_id"],
user1_ban_response["event_id"],
],
- channel.json_body["rooms"][room_id1]["timeline"],
+ response_body["rooms"][room_id1]["timeline"],
)
# All live events in the incremental sync
self.assertEqual(
- channel.json_body["rooms"][room_id1]["num_live"],
+ response_body["rooms"][room_id1]["num_live"],
3,
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1],
)
# There aren't anymore events to paginate to in this range
self.assertEqual(
- channel.json_body["rooms"][room_id1]["limited"],
+ response_body["rooms"][room_id1]["limited"],
False,
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1],
)
def test_rooms_ban_incremental_sync2(self) -> None:
@@ -3650,29 +3541,11 @@ class SlidingSyncTestCase(SlidingSyncBase):
self.helper.send(room_id1, "activity after4", tok=user2_tok)
- # Make the Sliding Sync request
- channel = self.make_request(
- "POST",
- self.sync_endpoint + f"?pos={from_token}",
- content=sync_body,
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ # Make the incremental Sliding Sync request
+ response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
# Nothing to see for this banned user in the room in the token range
- self.assertIsNone(channel.json_body["rooms"][room_id1].get("timeline"))
- # No events returned in the timeline so nothing is "live"
- self.assertEqual(
- channel.json_body["rooms"][room_id1]["num_live"],
- 0,
- channel.json_body["rooms"][room_id1],
- )
- # There aren't anymore events to paginate to in this range
- self.assertEqual(
- channel.json_body["rooms"][room_id1]["limited"],
- False,
- channel.json_body["rooms"][room_id1],
- )
+ self.assertIsNone(response_body["rooms"].get(room_id1))
def test_rooms_no_required_state(self) -> None:
"""
@@ -3687,27 +3560,22 @@ class SlidingSyncTestCase(SlidingSyncBase):
self.helper.join(room_id1, user1_id, tok=user1_tok)
# Make the Sliding Sync request
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 1]],
- # Empty `required_state`
- "required_state": [],
- "timeline_limit": 0,
- }
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ # Empty `required_state`
+ "required_state": [],
+ "timeline_limit": 0,
}
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# No `required_state` in response
self.assertIsNone(
- channel.json_body["rooms"][room_id1].get("required_state"),
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1].get("required_state"),
+ response_body["rooms"][room_id1],
)
def test_rooms_required_state_initial_sync(self) -> None:
@@ -3724,40 +3592,35 @@ class SlidingSyncTestCase(SlidingSyncBase):
self.helper.join(room_id1, user1_id, tok=user1_tok)
# Make the Sliding Sync request
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 1]],
- "required_state": [
- [EventTypes.Create, ""],
- [EventTypes.RoomHistoryVisibility, ""],
- # This one doesn't exist in the room
- [EventTypes.Tombstone, ""],
- ],
- "timeline_limit": 0,
- }
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [
+ [EventTypes.Create, ""],
+ [EventTypes.RoomHistoryVisibility, ""],
+ # This one doesn't exist in the room
+ [EventTypes.Tombstone, ""],
+ ],
+ "timeline_limit": 0,
}
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
- channel.json_body["rooms"][room_id1]["required_state"],
+ response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Create, "")],
state_map[(EventTypes.RoomHistoryVisibility, "")],
},
exact=True,
)
- self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+ self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
def test_rooms_required_state_incremental_sync(self) -> None:
"""
@@ -3782,37 +3645,73 @@ class SlidingSyncTestCase(SlidingSyncBase):
# This one doesn't exist in the room
[EventTypes.Tombstone, ""],
],
- "timeline_limit": 0,
+ "timeline_limit": 1,
+ }
+ }
+ }
+ _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+ # Send a message so the room comes down sync.
+ self.helper.send(room_id1, "msg", tok=user1_tok)
+
+ # Make the incremental Sliding Sync request
+ response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+ # We only return updates but only if we've sent the room down the
+ # connection before.
+ self.assertIsNone(response_body["rooms"][room_id1].get("required_state"))
+ self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
+
+ def test_rooms_required_state_incremental_sync_restart(self) -> None:
+ """
+ Test `rooms.required_state` returns requested state events in the room during an
+ incremental sync, after a restart (and so the in memory caches are reset).
+ """
+
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [
+ [EventTypes.Create, ""],
+ [EventTypes.RoomHistoryVisibility, ""],
+ # This one doesn't exist in the room
+ [EventTypes.Tombstone, ""],
+ ],
+ "timeline_limit": 1,
}
}
}
- _, after_room_token = self.do_sync(sync_body, tok=user1_tok)
+ _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+ # Reset the in-memory cache
+ self.hs.get_sliding_sync_handler().connection_store._connections.clear()
# Make the Sliding Sync request
- channel = self.make_request(
- "POST",
- self.sync_endpoint + f"?pos={after_room_token}",
- content=sync_body,
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+ # If the cache has been cleared then we do expect the state to come down
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
- # The returned state doesn't change from initial to incremental sync. In the
- # future, we will only return updates but only if we've sent the room down the
- # connection before.
self._assertRequiredStateIncludes(
- channel.json_body["rooms"][room_id1]["required_state"],
+ response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Create, "")],
state_map[(EventTypes.RoomHistoryVisibility, "")],
},
exact=True,
)
- self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+ self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
def test_rooms_required_state_wildcard(self) -> None:
"""
@@ -3842,35 +3741,30 @@ class SlidingSyncTestCase(SlidingSyncBase):
)
# Make the Sliding Sync request with wildcards for the `event_type` and `state_key`
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 1]],
- "required_state": [
- [StateValues.WILDCARD, StateValues.WILDCARD],
- ],
- "timeline_limit": 0,
- }
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [
+ [StateValues.WILDCARD, StateValues.WILDCARD],
+ ],
+ "timeline_limit": 0,
}
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
- channel.json_body["rooms"][room_id1]["required_state"],
+ response_body["rooms"][room_id1]["required_state"],
# We should see all the state events in the room
state_map.values(),
exact=True,
)
- self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+ self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
def test_rooms_required_state_wildcard_event_type(self) -> None:
"""
@@ -3901,23 +3795,18 @@ class SlidingSyncTestCase(SlidingSyncBase):
)
# Make the Sliding Sync request with wildcards for the `event_type`
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 1]],
- "required_state": [
- [StateValues.WILDCARD, user2_id],
- ],
- "timeline_limit": 0,
- }
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [
+ [StateValues.WILDCARD, user2_id],
+ ],
+ "timeline_limit": 0,
}
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
@@ -3925,7 +3814,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
# We expect at-least any state event with the `user2_id` as the `state_key`
self._assertRequiredStateIncludes(
- channel.json_body["rooms"][room_id1]["required_state"],
+ response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Member, user2_id)],
state_map[("org.matrix.foo_state", user2_id)],
@@ -3934,7 +3823,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
# events when the `event_type` is a wildcard.
exact=False,
)
- self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+ self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
def test_rooms_required_state_wildcard_state_key(self) -> None:
"""
@@ -3950,37 +3839,32 @@ class SlidingSyncTestCase(SlidingSyncBase):
self.helper.join(room_id1, user1_id, tok=user1_tok)
# Make the Sliding Sync request with wildcards for the `state_key`
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 1]],
- "required_state": [
- [EventTypes.Member, StateValues.WILDCARD],
- ],
- "timeline_limit": 0,
- }
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [
+ [EventTypes.Member, StateValues.WILDCARD],
+ ],
+ "timeline_limit": 0,
}
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
- channel.json_body["rooms"][room_id1]["required_state"],
+ response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Member, user1_id)],
state_map[(EventTypes.Member, user2_id)],
},
exact=True,
)
- self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+ self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
def test_rooms_required_state_lazy_loading_room_members(self) -> None:
"""
@@ -4003,24 +3887,19 @@ class SlidingSyncTestCase(SlidingSyncBase):
self.helper.send(room_id1, "3", tok=user2_tok)
# Make the Sliding Sync request with lazy loading for the room members
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 1]],
- "required_state": [
- [EventTypes.Create, ""],
- [EventTypes.Member, StateValues.LAZY],
- ],
- "timeline_limit": 3,
- }
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [
+ [EventTypes.Create, ""],
+ [EventTypes.Member, StateValues.LAZY],
+ ],
+ "timeline_limit": 3,
}
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
@@ -4028,7 +3907,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
# Only user2 and user3 sent events in the 3 events we see in the `timeline`
self._assertRequiredStateIncludes(
- channel.json_body["rooms"][room_id1]["required_state"],
+ response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Create, "")],
state_map[(EventTypes.Member, user2_id)],
@@ -4036,7 +3915,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
},
exact=True,
)
- self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+ self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
def test_rooms_required_state_me(self) -> None:
"""
@@ -4076,25 +3955,20 @@ class SlidingSyncTestCase(SlidingSyncBase):
)
# Make the Sliding Sync request with a request for '$ME'.
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 1]],
- "required_state": [
- [EventTypes.Create, ""],
- [EventTypes.Member, StateValues.ME],
- ["org.matrix.foo", StateValues.ME],
- ],
- "timeline_limit": 3,
- }
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [
+ [EventTypes.Create, ""],
+ [EventTypes.Member, StateValues.ME],
+ ["org.matrix.foo", StateValues.ME],
+ ],
+ "timeline_limit": 3,
}
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
@@ -4102,7 +3976,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
# Only user2 and user3 sent events in the 3 events we see in the `timeline`
self._assertRequiredStateIncludes(
- channel.json_body["rooms"][room_id1]["required_state"],
+ response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Create, "")],
state_map[(EventTypes.Member, user1_id)],
@@ -4110,7 +3984,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
},
exact=True,
)
- self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+ self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
@parameterized.expand([(Membership.LEAVE,), (Membership.BAN,)])
def test_rooms_required_state_leave_ban(self, stop_membership: str) -> None:
@@ -4173,17 +4047,11 @@ class SlidingSyncTestCase(SlidingSyncBase):
self.helper.leave(room_id1, user3_id, tok=user3_tok)
# Make the Sliding Sync request with lazy loading for the room members
- channel = self.make_request(
- "POST",
- self.sync_endpoint + f"?pos={from_token}",
- content=sync_body,
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
# Only user2 and user3 sent events in the 3 events we see in the `timeline`
self._assertRequiredStateIncludes(
- channel.json_body["rooms"][room_id1]["required_state"],
+ response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Create, "")],
state_map[(EventTypes.Member, user1_id)],
@@ -4193,7 +4061,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
},
exact=True,
)
- self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+ self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
def test_rooms_required_state_combine_superset(self) -> None:
"""
@@ -4223,45 +4091,40 @@ class SlidingSyncTestCase(SlidingSyncBase):
)
# Make the Sliding Sync request with wildcards for the `state_key`
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 1]],
- "required_state": [
- [EventTypes.Create, ""],
- [EventTypes.Member, user1_id],
- ],
- "timeline_limit": 0,
- },
- "bar-list": {
- "ranges": [[0, 1]],
- "required_state": [
- [EventTypes.Member, StateValues.WILDCARD],
- ["org.matrix.foo_state", ""],
- ],
- "timeline_limit": 0,
- },
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [
+ [EventTypes.Create, ""],
+ [EventTypes.Member, user1_id],
+ ],
+ "timeline_limit": 0,
},
- "room_subscriptions": {
- room_id1: {
- "required_state": [["org.matrix.bar_state", ""]],
- "timeline_limit": 0,
- }
+ "bar-list": {
+ "ranges": [[0, 1]],
+ "required_state": [
+ [EventTypes.Member, StateValues.WILDCARD],
+ ["org.matrix.foo_state", ""],
+ ],
+ "timeline_limit": 0,
},
},
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ "room_subscriptions": {
+ room_id1: {
+ "required_state": [["org.matrix.bar_state", ""]],
+ "timeline_limit": 0,
+ }
+ },
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
- channel.json_body["rooms"][room_id1]["required_state"],
+ response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Create, "")],
state_map[(EventTypes.Member, user1_id)],
@@ -4271,7 +4134,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
},
exact=True,
)
- self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+ self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
def test_rooms_required_state_partial_state(self) -> None:
"""
@@ -4294,28 +4157,23 @@ class SlidingSyncTestCase(SlidingSyncBase):
)
# Make the Sliding Sync request (NOT lazy-loading room members)
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 1]],
- "required_state": [
- [EventTypes.Create, ""],
- ],
- "timeline_limit": 0,
- },
- }
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [
+ [EventTypes.Create, ""],
+ ],
+ "timeline_limit": 0,
+ },
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Make sure the list includes room1 but room2 is excluded because it's still
# partially-stated
self.assertListEqual(
- list(channel.json_body["lists"]["foo-list"]["ops"]),
+ list(response_body["lists"]["foo-list"]["ops"]),
[
{
"op": "SYNC",
@@ -4323,33 +4181,28 @@ class SlidingSyncTestCase(SlidingSyncBase):
"room_ids": [room_id1],
}
],
- channel.json_body["lists"]["foo-list"],
+ response_body["lists"]["foo-list"],
)
# Make the Sliding Sync request (with lazy-loading room members)
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {
- "foo-list": {
- "ranges": [[0, 1]],
- "required_state": [
- [EventTypes.Create, ""],
- # Lazy-load room members
- [EventTypes.Member, StateValues.LAZY],
- ],
- "timeline_limit": 0,
- },
- }
- },
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [
+ [EventTypes.Create, ""],
+ # Lazy-load room members
+ [EventTypes.Member, StateValues.LAZY],
+ ],
+ "timeline_limit": 0,
+ },
+ }
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# The list should include both rooms now because we're lazy-loading room members
self.assertListEqual(
- list(channel.json_body["lists"]["foo-list"]["ops"]),
+ list(response_body["lists"]["foo-list"]["ops"]),
[
{
"op": "SYNC",
@@ -4357,7 +4210,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
"room_ids": [room_id2, room_id1],
}
],
- channel.json_body["lists"]["foo-list"],
+ response_body["lists"]["foo-list"],
)
def test_room_subscriptions_with_join_membership(self) -> None:
@@ -4374,22 +4227,17 @@ class SlidingSyncTestCase(SlidingSyncBase):
join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
# Make the Sliding Sync request with just the room subscription
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "room_subscriptions": {
- room_id1: {
- "required_state": [
- [EventTypes.Create, ""],
- ],
- "timeline_limit": 1,
- }
- },
+ sync_body = {
+ "room_subscriptions": {
+ room_id1: {
+ "required_state": [
+ [EventTypes.Create, ""],
+ ],
+ "timeline_limit": 1,
+ }
},
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
@@ -4397,37 +4245,37 @@ class SlidingSyncTestCase(SlidingSyncBase):
# We should see some state
self._assertRequiredStateIncludes(
- channel.json_body["rooms"][room_id1]["required_state"],
+ response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Create, "")],
},
exact=True,
)
- self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+ self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
# We should see some events
self.assertEqual(
[
event["event_id"]
- for event in channel.json_body["rooms"][room_id1]["timeline"]
+ for event in response_body["rooms"][room_id1]["timeline"]
],
[
join_response["event_id"],
],
- channel.json_body["rooms"][room_id1]["timeline"],
+ response_body["rooms"][room_id1]["timeline"],
)
# No "live" events in an initial sync (no `from_token` to define the "live"
# range)
self.assertEqual(
- channel.json_body["rooms"][room_id1]["num_live"],
+ response_body["rooms"][room_id1]["num_live"],
0,
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1],
)
# There are more events to paginate to
self.assertEqual(
- channel.json_body["rooms"][room_id1]["limited"],
+ response_body["rooms"][room_id1]["limited"],
True,
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1],
)
def test_room_subscriptions_with_leave_membership(self) -> None:
@@ -4468,57 +4316,52 @@ class SlidingSyncTestCase(SlidingSyncBase):
)
# Make the Sliding Sync request with just the room subscription
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "room_subscriptions": {
- room_id1: {
- "required_state": [
- ["org.matrix.foo_state", ""],
- ],
- "timeline_limit": 2,
- }
- },
+ sync_body = {
+ "room_subscriptions": {
+ room_id1: {
+ "required_state": [
+ ["org.matrix.foo_state", ""],
+ ],
+ "timeline_limit": 2,
+ }
},
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# We should see the state at the time of the leave
self._assertRequiredStateIncludes(
- channel.json_body["rooms"][room_id1]["required_state"],
+ response_body["rooms"][room_id1]["required_state"],
{
state_map[("org.matrix.foo_state", "")],
},
exact=True,
)
- self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+ self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
# We should see some before we left (nothing after)
self.assertEqual(
[
event["event_id"]
- for event in channel.json_body["rooms"][room_id1]["timeline"]
+ for event in response_body["rooms"][room_id1]["timeline"]
],
[
join_response["event_id"],
leave_response["event_id"],
],
- channel.json_body["rooms"][room_id1]["timeline"],
+ response_body["rooms"][room_id1]["timeline"],
)
# No "live" events in an initial sync (no `from_token` to define the "live"
# range)
self.assertEqual(
- channel.json_body["rooms"][room_id1]["num_live"],
+ response_body["rooms"][room_id1]["num_live"],
0,
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1],
)
# There are more events to paginate to
self.assertEqual(
- channel.json_body["rooms"][room_id1]["limited"],
+ response_body["rooms"][room_id1]["limited"],
True,
- channel.json_body["rooms"][room_id1],
+ response_body["rooms"][room_id1],
)
def test_room_subscriptions_no_leak_private_room(self) -> None:
@@ -4539,27 +4382,20 @@ class SlidingSyncTestCase(SlidingSyncBase):
)
# Make the Sliding Sync request with just the room subscription
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "room_subscriptions": {
- room_id1: {
- "required_state": [
- [EventTypes.Create, ""],
- ],
- "timeline_limit": 1,
- }
- },
+ sync_body = {
+ "room_subscriptions": {
+ room_id1: {
+ "required_state": [
+ [EventTypes.Create, ""],
+ ],
+ "timeline_limit": 1,
+ }
},
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# We should not see the room at all (we're not in it)
- self.assertIsNone(
- channel.json_body["rooms"].get(room_id1), channel.json_body["rooms"]
- )
+ self.assertIsNone(response_body["rooms"].get(room_id1), response_body["rooms"])
def test_room_subscriptions_world_readable(self) -> None:
"""
@@ -4602,111 +4438,506 @@ class SlidingSyncTestCase(SlidingSyncBase):
# Note: We never join the room
# Make the Sliding Sync request with just the room subscription
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "room_subscriptions": {
- room_id1: {
- "required_state": [
- [EventTypes.Create, ""],
- ],
- "timeline_limit": 1,
- }
- },
+ sync_body = {
+ "room_subscriptions": {
+ room_id1: {
+ "required_state": [
+ [EventTypes.Create, ""],
+ ],
+ "timeline_limit": 1,
+ }
},
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# FIXME: In the future, we should be able to see the room because it's
# `world_readable` but currently we don't support this.
- self.assertIsNone(
- channel.json_body["rooms"].get(room_id1), channel.json_body["rooms"]
+ self.assertIsNone(response_body["rooms"].get(room_id1), response_body["rooms"])
+
+ def test_rooms_required_state_incremental_sync_LIVE(self) -> None:
+ """Test that we only get state updates in incremental sync for rooms
+ we've already seen (LIVE).
+ """
+
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+ # Make the Sliding Sync request
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [
+ [EventTypes.Create, ""],
+ [EventTypes.RoomHistoryVisibility, ""],
+ # This one doesn't exist in the room
+ [EventTypes.Name, ""],
+ ],
+ "timeline_limit": 0,
+ }
+ }
+ }
+
+ response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+ state_map = self.get_success(
+ self.storage_controllers.state.get_current_state(room_id1)
)
+ self._assertRequiredStateIncludes(
+ response_body["rooms"][room_id1]["required_state"],
+ {
+ state_map[(EventTypes.Create, "")],
+ state_map[(EventTypes.RoomHistoryVisibility, "")],
+ },
+ exact=True,
+ )
-class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
- """Tests for the to-device sliding sync extension"""
+ # Send a state event
+ self.helper.send_state(
+ room_id1, EventTypes.Name, body={"name": "foo"}, tok=user2_tok
+ )
- servlets = [
- synapse.rest.admin.register_servlets,
- login.register_servlets,
- sync.register_servlets,
- sendtodevice.register_servlets,
- ]
+ response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
- def default_config(self) -> JsonDict:
- config = super().default_config()
- # Enable sliding sync
- config["experimental_features"] = {"msc3575_enabled": True}
- return config
+ state_map = self.get_success(
+ self.storage_controllers.state.get_current_state(room_id1)
+ )
- def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
- self.store = hs.get_datastores().main
- self.event_sources = hs.get_event_sources()
- self.account_data_handler = hs.get_account_data_handler()
- self.notifier = hs.get_notifier()
- self.sync_endpoint = (
- "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
+ self.assertNotIn("initial", response_body["rooms"][room_id1])
+ self._assertRequiredStateIncludes(
+ response_body["rooms"][room_id1]["required_state"],
+ {
+ state_map[(EventTypes.Name, "")],
+ },
+ exact=True,
)
- def _bump_notifier_wait_for_events(self, user_id: str) -> None:
+ @parameterized.expand([(False,), (True,)])
+ def test_rooms_timeline_incremental_sync_PREVIOUSLY(self, limited: bool) -> None:
"""
- Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding
- Sync results.
+ Test getting room data where we have previously sent down the room, but
+ we missed sending down some timeline events previously and so its status
+ is considered PREVIOUSLY.
+
+ There are two versions of this test, one where there are more messages
+ than the timeline limit, and one where there isn't.
"""
- # We're expecting some new activity from this point onwards
- from_token = self.event_sources.get_current_token()
- triggered_notifier_wait_for_events = False
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
- async def _on_new_acivity(
- before_token: StreamToken, after_token: StreamToken
- ) -> bool:
- nonlocal triggered_notifier_wait_for_events
- triggered_notifier_wait_for_events = True
- return True
+ room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+ room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
- # Listen for some new activity for the user. We're just trying to confirm that
- # our bump below actually does what we think it does (triggers new activity for
- # the user).
- result_awaitable = self.notifier.wait_for_events(
- user_id,
- 1000,
- _on_new_acivity,
- from_token=from_token,
+ self.helper.send(room_id1, "msg", tok=user1_tok)
+
+ timeline_limit = 5
+ conn_id = "conn_id"
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 0]],
+ "required_state": [],
+ "timeline_limit": timeline_limit,
+ }
+ },
+ "conn_id": "conn_id",
+ }
+
+ # The first room gets sent down the initial sync
+ response_body, initial_from_token = self.do_sync(sync_body, tok=user1_tok)
+ self.assertCountEqual(
+ response_body["rooms"].keys(), {room_id1}, response_body["rooms"]
)
- # Update the account data so that `notifier.wait_for_events(...)` wakes up.
- # We're bumping account data because it won't show up in the Sliding Sync
- # response so it won't affect whether we have results.
- self.get_success(
- self.account_data_handler.add_account_data_for_user(
- user_id,
- "org.matrix.foobarbaz",
- {"foo": "bar"},
+ # We now send down some events in room1 (depending on the test param).
+ expected_events = [] # The set of events in the timeline
+ if limited:
+ for _ in range(10):
+ resp = self.helper.send(room_id1, "msg1", tok=user1_tok)
+ expected_events.append(resp["event_id"])
+ else:
+ resp = self.helper.send(room_id1, "msg1", tok=user1_tok)
+ expected_events.append(resp["event_id"])
+
+ # A second messages happens in the other room, so room1 won't get sent down.
+ self.helper.send(room_id2, "msg", tok=user1_tok)
+
+ # Only the second room gets sent down sync.
+ response_body, from_token = self.do_sync(
+ sync_body, since=initial_from_token, tok=user1_tok
+ )
+
+ self.assertCountEqual(
+ response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
+ )
+
+ # FIXME: This is a hack to record that the first room wasn't sent down
+ # sync, as we don't implement that currently.
+ sliding_sync_handler = self.hs.get_sliding_sync_handler()
+ requester = self.get_success(
+ self.hs.get_auth().get_user_by_access_token(user1_tok)
+ )
+ sync_config = SlidingSyncConfig(
+ user=requester.user,
+ requester=requester,
+ conn_id=conn_id,
+ )
+
+ parsed_initial_from_token = self.get_success(
+ SlidingSyncStreamToken.from_string(self.store, initial_from_token)
+ )
+ connection_position = self.get_success(
+ sliding_sync_handler.connection_store.record_rooms(
+ sync_config,
+ parsed_initial_from_token,
+ sent_room_ids=[],
+ unsent_room_ids=[room_id1],
)
)
- # Wait for our notifier result
- self.get_success(result_awaitable)
+ # FIXME: Now fix up `from_token` with new connect position above.
+ parsed_from_token = self.get_success(
+ SlidingSyncStreamToken.from_string(self.store, from_token)
+ )
+ parsed_from_token = SlidingSyncStreamToken(
+ stream_token=parsed_from_token.stream_token,
+ connection_position=connection_position,
+ )
+ from_token = self.get_success(parsed_from_token.to_string(self.store))
- if not triggered_notifier_wait_for_events:
- raise AssertionError(
- "Expected `notifier.wait_for_events(...)` to be triggered"
+ # We now send another event to room1, so we should sync all the missing events.
+ resp = self.helper.send(room_id1, "msg2", tok=user1_tok)
+ expected_events.append(resp["event_id"])
+
+ # This sync should contain the messages from room1 not yet sent down.
+ response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+ self.assertCountEqual(
+ response_body["rooms"].keys(), {room_id1}, response_body["rooms"]
+ )
+ self.assertNotIn("initial", response_body["rooms"][room_id1])
+
+ self.assertEqual(
+ [ev["event_id"] for ev in response_body["rooms"][room_id1]["timeline"]],
+ expected_events[-timeline_limit:],
+ )
+ self.assertEqual(response_body["rooms"][room_id1]["limited"], limited)
+ self.assertEqual(response_body["rooms"][room_id1].get("required_state"), None)
+
+ def test_rooms_required_state_incremental_sync_PREVIOUSLY(self) -> None:
+ """
+ Test getting room data where we have previously sent down the room, but
+ we missed sending down some state previously and so its status is
+ considered PREVIOUSLY.
+ """
+
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+ room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+ self.helper.send(room_id1, "msg", tok=user1_tok)
+
+ conn_id = "conn_id"
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 0]],
+ "required_state": [
+ [EventTypes.Create, ""],
+ [EventTypes.RoomHistoryVisibility, ""],
+ # This one doesn't exist in the room
+ [EventTypes.Name, ""],
+ ],
+ "timeline_limit": 0,
+ }
+ },
+ "conn_id": "conn_id",
+ }
+
+ # The first room gets sent down the initial sync
+ response_body, initial_from_token = self.do_sync(sync_body, tok=user1_tok)
+ self.assertCountEqual(
+ response_body["rooms"].keys(), {room_id1}, response_body["rooms"]
+ )
+
+ # We now send down some state in room1
+ resp = self.helper.send_state(
+ room_id1, EventTypes.Name, {"name": "foo"}, tok=user1_tok
+ )
+ name_change_id = resp["event_id"]
+
+ # A second messages happens in the other room, so room1 won't get sent down.
+ self.helper.send(room_id2, "msg", tok=user1_tok)
+
+ # Only the second room gets sent down sync.
+ response_body, from_token = self.do_sync(
+ sync_body, since=initial_from_token, tok=user1_tok
+ )
+
+ self.assertCountEqual(
+ response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
+ )
+
+ # FIXME: This is a hack to record that the first room wasn't sent down
+ # sync, as we don't implement that currently.
+ sliding_sync_handler = self.hs.get_sliding_sync_handler()
+ requester = self.get_success(
+ self.hs.get_auth().get_user_by_access_token(user1_tok)
+ )
+ sync_config = SlidingSyncConfig(
+ user=requester.user,
+ requester=requester,
+ conn_id=conn_id,
+ )
+
+ parsed_initial_from_token = self.get_success(
+ SlidingSyncStreamToken.from_string(self.store, initial_from_token)
+ )
+ connection_position = self.get_success(
+ sliding_sync_handler.connection_store.record_rooms(
+ sync_config,
+ parsed_initial_from_token,
+ sent_room_ids=[],
+ unsent_room_ids=[room_id1],
)
+ )
+
+ # FIXME: Now fix up `from_token` with new connect position above.
+ parsed_from_token = self.get_success(
+ SlidingSyncStreamToken.from_string(self.store, from_token)
+ )
+ parsed_from_token = SlidingSyncStreamToken(
+ stream_token=parsed_from_token.stream_token,
+ connection_position=connection_position,
+ )
+ from_token = self.get_success(parsed_from_token.to_string(self.store))
+
+ # We now send another event to room1, so we should sync all the missing state.
+ self.helper.send(room_id1, "msg", tok=user1_tok)
+
+ # This sync should contain the state changes from room1.
+ response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+ self.assertCountEqual(
+ response_body["rooms"].keys(), {room_id1}, response_body["rooms"]
+ )
+ self.assertNotIn("initial", response_body["rooms"][room_id1])
+
+ # We should only see the name change.
+ self.assertEqual(
+ [
+ ev["event_id"]
+ for ev in response_body["rooms"][room_id1]["required_state"]
+ ],
+ [name_change_id],
+ )
+
+ def test_rooms_required_state_incremental_sync_NEVER(self) -> None:
+ """
+ Test getting `required_state` where we have NEVER sent down the room before
+ """
+
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+ room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+ self.helper.send(room_id1, "msg", tok=user1_tok)
+
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 0]],
+ "required_state": [
+ [EventTypes.Create, ""],
+ [EventTypes.RoomHistoryVisibility, ""],
+ # This one doesn't exist in the room
+ [EventTypes.Name, ""],
+ ],
+ "timeline_limit": 1,
+ }
+ },
+ }
+
+ # A message happens in the other room, so room1 won't get sent down.
+ self.helper.send(room_id2, "msg", tok=user1_tok)
+
+ # Only the second room gets sent down sync.
+ response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+ self.assertCountEqual(
+ response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
+ )
+
+ # We now send another event to room1, so we should send down the full
+ # room.
+ self.helper.send(room_id1, "msg2", tok=user1_tok)
+
+ # This sync should contain the messages from room1 not yet sent down.
+ response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+ self.assertCountEqual(
+ response_body["rooms"].keys(), {room_id1}, response_body["rooms"]
+ )
+
+ self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
+
+ state_map = self.get_success(
+ self.storage_controllers.state.get_current_state(room_id1)
+ )
+
+ self._assertRequiredStateIncludes(
+ response_body["rooms"][room_id1]["required_state"],
+ {
+ state_map[(EventTypes.Create, "")],
+ state_map[(EventTypes.RoomHistoryVisibility, "")],
+ },
+ exact=True,
+ )
+
+ def test_rooms_timeline_incremental_sync_NEVER(self) -> None:
+ """
+ Test getting timeline room data where we have NEVER sent down the room
+ before
+ """
+
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+ room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 0]],
+ "required_state": [],
+ "timeline_limit": 5,
+ }
+ },
+ }
+
+ expected_events = []
+ for _ in range(4):
+ resp = self.helper.send(room_id1, "msg", tok=user1_tok)
+ expected_events.append(resp["event_id"])
+
+ # A message happens in the other room, so room1 won't get sent down.
+ self.helper.send(room_id2, "msg", tok=user1_tok)
+
+ # Only the second room gets sent down sync.
+ response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+ self.assertCountEqual(
+ response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
+ )
+
+ # We now send another event to room1 so it comes down sync
+ resp = self.helper.send(room_id1, "msg2", tok=user1_tok)
+ expected_events.append(resp["event_id"])
+
+ # This sync should contain the messages from room1 not yet sent down.
+ response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+ self.assertCountEqual(
+ response_body["rooms"].keys(), {room_id1}, response_body["rooms"]
+ )
+
+ self.assertEqual(
+ [ev["event_id"] for ev in response_body["rooms"][room_id1]["timeline"]],
+ expected_events,
+ )
+ self.assertEqual(response_body["rooms"][room_id1]["limited"], True)
+ self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
+
+ def test_rooms_with_no_updates_do_not_come_down_incremental_sync(self) -> None:
+ """
+ Test that rooms with no updates are returned in subsequent incremental
+ syncs.
+ """
+
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 0,
+ }
+ }
+ }
+
+ _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+ # Make the incremental Sliding Sync request
+ response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+ # Nothing has happened in the room, so the room should not come down
+ # /sync.
+ self.assertIsNone(response_body["rooms"].get(room_id1))
+
+ def test_empty_initial_room_comes_down_sync(self) -> None:
+ """
+ Test that rooms come down /sync even with empty required state and
+ timeline limit in initial sync.
+ """
+
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+ sync_body = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 0,
+ }
+ }
+ }
+
+ # Make the Sliding Sync request
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+ self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
+
+
+class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
+ """Tests for the to-device sliding sync extension"""
+
+ servlets = [
+ synapse.rest.admin.register_servlets,
+ login.register_servlets,
+ sync.register_servlets,
+ sendtodevice.register_servlets,
+ ]
+
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+ self.store = hs.get_datastores().main
def _assert_to_device_response(
- self, channel: FakeChannel, expected_messages: List[JsonDict]
+ self, response_body: JsonDict, expected_messages: List[JsonDict]
) -> str:
"""Assert the sliding sync response was successful and has the expected
to-device messages.
Returns the next_batch token from the to-device section.
"""
- self.assertEqual(channel.code, 200, channel.json_body)
- extensions = channel.json_body["extensions"]
+ extensions = response_body["extensions"]
to_device = extensions["to_device"]
self.assertIsInstance(to_device["next_batch"], str)
self.assertEqual(to_device["events"], expected_messages)
@@ -4720,22 +4951,18 @@ class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {},
- "extensions": {
- "to_device": {
- "enabled": True,
- }
- },
+ sync_body = {
+ "lists": {},
+ "extensions": {
+ "to_device": {
+ "enabled": True,
+ }
},
- access_token=user1_tok,
- )
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# We expect no to-device messages
- self._assert_to_device_response(channel, [])
+ self._assert_to_device_response(response_body, [])
def test_data_initial_sync(self) -> None:
"""Test that we get to-device messages when we don't specify a since
@@ -4756,21 +4983,17 @@ class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
)
self.assertEqual(chan.code, 200, chan.result)
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {},
- "extensions": {
- "to_device": {
- "enabled": True,
- }
- },
+ sync_body = {
+ "lists": {},
+ "extensions": {
+ "to_device": {
+ "enabled": True,
+ }
},
- access_token=user1_tok,
- )
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
self._assert_to_device_response(
- channel,
+ response_body,
[{"content": test_msg, "sender": user2_id, "type": "m.test"}],
)
@@ -4782,21 +5005,17 @@ class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
user2_id = self.register_user("u2", "pass")
user2_tok = self.login(user2_id, "pass", "d2")
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {},
- "extensions": {
- "to_device": {
- "enabled": True,
- }
- },
+ sync_body: JsonDict = {
+ "lists": {},
+ "extensions": {
+ "to_device": {
+ "enabled": True,
+ }
},
- access_token=user1_tok,
- )
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# No to-device messages yet.
- next_batch = self._assert_to_device_response(channel, [])
+ next_batch = self._assert_to_device_response(response_body, [])
test_msg = {"foo": "bar"}
chan = self.make_request(
@@ -4807,59 +5026,47 @@ class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
)
self.assertEqual(chan.code, 200, chan.result)
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {},
- "extensions": {
- "to_device": {
- "enabled": True,
- "since": next_batch,
- }
- },
+ sync_body = {
+ "lists": {},
+ "extensions": {
+ "to_device": {
+ "enabled": True,
+ "since": next_batch,
+ }
},
- access_token=user1_tok,
- )
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
next_batch = self._assert_to_device_response(
- channel,
+ response_body,
[{"content": test_msg, "sender": user2_id, "type": "m.test"}],
)
# The next sliding sync request should not include the to-device
# message.
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {},
- "extensions": {
- "to_device": {
- "enabled": True,
- "since": next_batch,
- }
- },
+ sync_body = {
+ "lists": {},
+ "extensions": {
+ "to_device": {
+ "enabled": True,
+ "since": next_batch,
+ }
},
- access_token=user1_tok,
- )
- self._assert_to_device_response(channel, [])
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+ self._assert_to_device_response(response_body, [])
# An initial sliding sync request should not include the to-device
# message, as it should have been deleted
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {},
- "extensions": {
- "to_device": {
- "enabled": True,
- }
- },
+ sync_body = {
+ "lists": {},
+ "extensions": {
+ "to_device": {
+ "enabled": True,
+ }
},
- access_token=user1_tok,
- )
- self._assert_to_device_response(channel, [])
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+ self._assert_to_device_response(response_body, [])
def test_wait_for_new_data(self) -> None:
"""
@@ -4909,7 +5116,7 @@ class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
self.assertEqual(channel.code, 200, channel.json_body)
self._assert_to_device_response(
- channel,
+ channel.json_body,
[{"content": test_msg, "sender": user2_id, "type": "m.test"}],
)
@@ -4945,7 +5152,9 @@ class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
channel.await_result(timeout_ms=5000)
# Wake-up `notifier.wait_for_events(...)` that will cause us test
# `SlidingSyncResult.__bool__` for new results.
- self._bump_notifier_wait_for_events(user1_id)
+ self._bump_notifier_wait_for_events(
+ user1_id, wake_stream_key=StreamKeyType.ACCOUNT_DATA
+ )
# Block for a little bit more to ensure we don't see any new results.
with self.assertRaises(TimedOutException):
channel.await_result(timeout_ms=4000)
@@ -4954,7 +5163,7 @@ class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
channel.await_result(timeout_ms=1200)
self.assertEqual(channel.code, 200, channel.json_body)
- self._assert_to_device_response(channel, [])
+ self._assert_to_device_response(channel.json_body, [])
class SlidingSyncE2eeExtensionTestCase(SlidingSyncBase):
@@ -4968,67 +5177,9 @@ class SlidingSyncE2eeExtensionTestCase(SlidingSyncBase):
devices.register_servlets,
]
- def default_config(self) -> JsonDict:
- config = super().default_config()
- # Enable sliding sync
- config["experimental_features"] = {"msc3575_enabled": True}
- return config
-
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
- self.event_sources = hs.get_event_sources()
self.e2e_keys_handler = hs.get_e2e_keys_handler()
- self.account_data_handler = hs.get_account_data_handler()
- self.notifier = hs.get_notifier()
- self.sync_endpoint = (
- "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
- )
-
- def _bump_notifier_wait_for_events(self, user_id: str) -> None:
- """
- Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding
- Sync results.
- """
- # We're expecting some new activity from this point onwards
- from_token = self.event_sources.get_current_token()
-
- triggered_notifier_wait_for_events = False
-
- async def _on_new_acivity(
- before_token: StreamToken, after_token: StreamToken
- ) -> bool:
- nonlocal triggered_notifier_wait_for_events
- triggered_notifier_wait_for_events = True
- return True
-
- # Listen for some new activity for the user. We're just trying to confirm that
- # our bump below actually does what we think it does (triggers new activity for
- # the user).
- result_awaitable = self.notifier.wait_for_events(
- user_id,
- 1000,
- _on_new_acivity,
- from_token=from_token,
- )
-
- # Update the account data so that `notifier.wait_for_events(...)` wakes up.
- # We're bumping account data because it won't show up in the Sliding Sync
- # response so it won't affect whether we have results.
- self.get_success(
- self.account_data_handler.add_account_data_for_user(
- user_id,
- "org.matrix.foobarbaz",
- {"foo": "bar"},
- )
- )
-
- # Wait for our notifier result
- self.get_success(result_awaitable)
-
- if not triggered_notifier_wait_for_events:
- raise AssertionError(
- "Expected `notifier.wait_for_events(...)` to be triggered"
- )
def test_no_data_initial_sync(self) -> None:
"""
@@ -5039,27 +5190,22 @@ class SlidingSyncE2eeExtensionTestCase(SlidingSyncBase):
user1_tok = self.login(user1_id, "pass")
# Make an initial Sliding Sync request with the e2ee extension enabled
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {},
- "extensions": {
- "e2ee": {
- "enabled": True,
- }
- },
+ sync_body = {
+ "lists": {},
+ "extensions": {
+ "e2ee": {
+ "enabled": True,
+ }
},
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Device list updates are only present for incremental syncs
- self.assertIsNone(channel.json_body["extensions"]["e2ee"].get("device_lists"))
+ self.assertIsNone(response_body["extensions"]["e2ee"].get("device_lists"))
# Both of these should be present even when empty
self.assertEqual(
- channel.json_body["extensions"]["e2ee"]["device_one_time_keys_count"],
+ response_body["extensions"]["e2ee"]["device_one_time_keys_count"],
{
# This is always present because of
# https://github.com/element-hq/element-android/issues/3725 and
@@ -5068,7 +5214,7 @@ class SlidingSyncE2eeExtensionTestCase(SlidingSyncBase):
},
)
self.assertEqual(
- channel.json_body["extensions"]["e2ee"]["device_unused_fallback_key_types"],
+ response_body["extensions"]["e2ee"]["device_unused_fallback_key_types"],
[],
)
@@ -5091,29 +5237,21 @@ class SlidingSyncE2eeExtensionTestCase(SlidingSyncBase):
_, from_token = self.do_sync(sync_body, tok=user1_tok)
# Make an incremental Sliding Sync request with the e2ee extension enabled
- channel = self.make_request(
- "POST",
- self.sync_endpoint + f"?pos={from_token}",
- content=sync_body,
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
# Device list shows up for incremental syncs
self.assertEqual(
- channel.json_body["extensions"]["e2ee"]
- .get("device_lists", {})
- .get("changed"),
+ response_body["extensions"]["e2ee"].get("device_lists", {}).get("changed"),
[],
)
self.assertEqual(
- channel.json_body["extensions"]["e2ee"].get("device_lists", {}).get("left"),
+ response_body["extensions"]["e2ee"].get("device_lists", {}).get("left"),
[],
)
# Both of these should be present even when empty
self.assertEqual(
- channel.json_body["extensions"]["e2ee"]["device_one_time_keys_count"],
+ response_body["extensions"]["e2ee"]["device_one_time_keys_count"],
{
# Note that "signed_curve25519" is always returned in key count responses
# regardless of whether we uploaded any keys for it. This is necessary until
@@ -5126,7 +5264,7 @@ class SlidingSyncE2eeExtensionTestCase(SlidingSyncBase):
},
)
self.assertEqual(
- channel.json_body["extensions"]["e2ee"]["device_unused_fallback_key_types"],
+ response_body["extensions"]["e2ee"]["device_unused_fallback_key_types"],
[],
)
@@ -5231,7 +5369,9 @@ class SlidingSyncE2eeExtensionTestCase(SlidingSyncBase):
channel.await_result(timeout_ms=5000)
# Wake-up `notifier.wait_for_events(...)` that will cause us test
# `SlidingSyncResult.__bool__` for new results.
- self._bump_notifier_wait_for_events(user1_id)
+ self._bump_notifier_wait_for_events(
+ user1_id, wake_stream_key=StreamKeyType.ACCOUNT_DATA
+ )
# Block for a little bit more to ensure we don't see any new results.
with self.assertRaises(TimedOutException):
channel.await_result(timeout_ms=4000)
@@ -5317,23 +5457,15 @@ class SlidingSyncE2eeExtensionTestCase(SlidingSyncBase):
self.helper.leave(room_id, user4_id, tok=user4_tok)
# Make an incremental Sliding Sync request with the e2ee extension enabled
- channel = self.make_request(
- "POST",
- self.sync_endpoint + f"?pos={from_token}",
- content=sync_body,
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
# Device list updates show up
self.assertEqual(
- channel.json_body["extensions"]["e2ee"]
- .get("device_lists", {})
- .get("changed"),
+ response_body["extensions"]["e2ee"].get("device_lists", {}).get("changed"),
[user3_id],
)
self.assertEqual(
- channel.json_body["extensions"]["e2ee"].get("device_lists", {}).get("left"),
+ response_body["extensions"]["e2ee"].get("device_lists", {}).get("left"),
[user4_id],
)
@@ -5375,24 +5507,19 @@ class SlidingSyncE2eeExtensionTestCase(SlidingSyncBase):
)
# Make a Sliding Sync request with the e2ee extension enabled
- channel = self.make_request(
- "POST",
- self.sync_endpoint,
- {
- "lists": {},
- "extensions": {
- "e2ee": {
- "enabled": True,
- }
- },
+ sync_body = {
+ "lists": {},
+ "extensions": {
+ "e2ee": {
+ "enabled": True,
+ }
},
- access_token=user1_tok,
- )
- self.assertEqual(channel.code, 200, channel.json_body)
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
# Check for those one time key counts
self.assertEqual(
- channel.json_body["extensions"]["e2ee"].get("device_one_time_keys_count"),
+ response_body["extensions"]["e2ee"].get("device_one_time_keys_count"),
{
"alg1": 1,
"alg2": 2,
@@ -5436,25 +5563,747 @@ class SlidingSyncE2eeExtensionTestCase(SlidingSyncBase):
self.assertEqual(fallback_res, ["alg1"], fallback_res)
# Make a Sliding Sync request with the e2ee extension enabled
+ sync_body = {
+ "lists": {},
+ "extensions": {
+ "e2ee": {
+ "enabled": True,
+ }
+ },
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+ # Check for the unused fallback key types
+ self.assertListEqual(
+ response_body["extensions"]["e2ee"].get("device_unused_fallback_key_types"),
+ ["alg1"],
+ )
+
+
+class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
+ """Tests for the account_data sliding sync extension"""
+
+ servlets = [
+ synapse.rest.admin.register_servlets,
+ login.register_servlets,
+ room.register_servlets,
+ sync.register_servlets,
+ sendtodevice.register_servlets,
+ ]
+
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+ self.store = hs.get_datastores().main
+ self.account_data_handler = hs.get_account_data_handler()
+
+ def test_no_data_initial_sync(self) -> None:
+ """
+ Test that enabling the account_data extension works during an intitial sync,
+ even if there is no-data.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ # Make an initial Sliding Sync request with the account_data extension enabled
+ sync_body = {
+ "lists": {},
+ "extensions": {
+ "account_data": {
+ "enabled": True,
+ }
+ },
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+ self.assertIncludes(
+ {
+ global_event["type"]
+ for global_event in response_body["extensions"]["account_data"].get(
+ "global"
+ )
+ },
+ # Even though we don't have any global account data set, Synapse saves some
+ # default push rules for us.
+ {AccountDataTypes.PUSH_RULES},
+ exact=True,
+ )
+ self.assertIncludes(
+ response_body["extensions"]["account_data"].get("rooms").keys(),
+ set(),
+ exact=True,
+ )
+
+ def test_no_data_incremental_sync(self) -> None:
+ """
+ Test that enabling account_data extension works during an incremental sync, even
+ if there is no-data.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ sync_body = {
+ "lists": {},
+ "extensions": {
+ "account_data": {
+ "enabled": True,
+ }
+ },
+ }
+ _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+ # Make an incremental Sliding Sync request with the account_data extension enabled
+ response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+ # There has been no account data changes since the `from_token` so we shouldn't
+ # see any account data here.
+ self.assertIncludes(
+ {
+ global_event["type"]
+ for global_event in response_body["extensions"]["account_data"].get(
+ "global"
+ )
+ },
+ set(),
+ exact=True,
+ )
+ self.assertIncludes(
+ response_body["extensions"]["account_data"].get("rooms").keys(),
+ set(),
+ exact=True,
+ )
+
+ def test_global_account_data_initial_sync(self) -> None:
+ """
+ On initial sync, we should return all global account data on initial sync.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ # Update the global account data
+ self.get_success(
+ self.account_data_handler.add_account_data_for_user(
+ user_id=user1_id,
+ account_data_type="org.matrix.foobarbaz",
+ content={"foo": "bar"},
+ )
+ )
+
+ # Make an initial Sliding Sync request with the account_data extension enabled
+ sync_body = {
+ "lists": {},
+ "extensions": {
+ "account_data": {
+ "enabled": True,
+ }
+ },
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+ # It should show us all of the global account data
+ self.assertIncludes(
+ {
+ global_event["type"]
+ for global_event in response_body["extensions"]["account_data"].get(
+ "global"
+ )
+ },
+ {AccountDataTypes.PUSH_RULES, "org.matrix.foobarbaz"},
+ exact=True,
+ )
+ self.assertIncludes(
+ response_body["extensions"]["account_data"].get("rooms").keys(),
+ set(),
+ exact=True,
+ )
+
+ def test_global_account_data_incremental_sync(self) -> None:
+ """
+ On incremental sync, we should only account data that has changed since the
+ `from_token`.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ # Add some global account data
+ self.get_success(
+ self.account_data_handler.add_account_data_for_user(
+ user_id=user1_id,
+ account_data_type="org.matrix.foobarbaz",
+ content={"foo": "bar"},
+ )
+ )
+
+ sync_body = {
+ "lists": {},
+ "extensions": {
+ "account_data": {
+ "enabled": True,
+ }
+ },
+ }
+ _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+ # Add some other global account data
+ self.get_success(
+ self.account_data_handler.add_account_data_for_user(
+ user_id=user1_id,
+ account_data_type="org.matrix.doodardaz",
+ content={"doo": "dar"},
+ )
+ )
+
+ # Make an incremental Sliding Sync request with the account_data extension enabled
+ response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+ self.assertIncludes(
+ {
+ global_event["type"]
+ for global_event in response_body["extensions"]["account_data"].get(
+ "global"
+ )
+ },
+ # We should only see the new global account data that happened after the `from_token`
+ {"org.matrix.doodardaz"},
+ exact=True,
+ )
+ self.assertIncludes(
+ response_body["extensions"]["account_data"].get("rooms").keys(),
+ set(),
+ exact=True,
+ )
+
+ def test_room_account_data_initial_sync(self) -> None:
+ """
+ On initial sync, we return all account data for a given room but only for
+ rooms that we request and are being returned in the Sliding Sync response.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ # Create a room and add some room account data
+ room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+ self.get_success(
+ self.account_data_handler.add_account_data_to_room(
+ user_id=user1_id,
+ room_id=room_id1,
+ account_data_type="org.matrix.roorarraz",
+ content={"roo": "rar"},
+ )
+ )
+
+ # Create another room with some room account data
+ room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+ self.get_success(
+ self.account_data_handler.add_account_data_to_room(
+ user_id=user1_id,
+ room_id=room_id2,
+ account_data_type="org.matrix.roorarraz",
+ content={"roo": "rar"},
+ )
+ )
+
+ # Make an initial Sliding Sync request with the account_data extension enabled
+ sync_body = {
+ "lists": {},
+ "room_subscriptions": {
+ room_id1: {
+ "required_state": [],
+ "timeline_limit": 0,
+ }
+ },
+ "extensions": {
+ "account_data": {
+ "enabled": True,
+ "rooms": [room_id1, room_id2],
+ }
+ },
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+ self.assertIsNotNone(response_body["extensions"]["account_data"].get("global"))
+ # Even though we requested room2, we only expect room1 to show up because that's
+ # the only room in the Sliding Sync response (room2 is not one of our room
+ # subscriptions or in a sliding window list).
+ self.assertIncludes(
+ response_body["extensions"]["account_data"].get("rooms").keys(),
+ {room_id1},
+ exact=True,
+ )
+ self.assertIncludes(
+ {
+ event["type"]
+ for event in response_body["extensions"]["account_data"]
+ .get("rooms")
+ .get(room_id1)
+ },
+ {"org.matrix.roorarraz"},
+ exact=True,
+ )
+
+ def test_room_account_data_incremental_sync(self) -> None:
+ """
+ On incremental sync, we return all account data for a given room but only for
+ rooms that we request and are being returned in the Sliding Sync response.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ # Create a room and add some room account data
+ room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+ self.get_success(
+ self.account_data_handler.add_account_data_to_room(
+ user_id=user1_id,
+ room_id=room_id1,
+ account_data_type="org.matrix.roorarraz",
+ content={"roo": "rar"},
+ )
+ )
+
+ # Create another room with some room account data
+ room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+ self.get_success(
+ self.account_data_handler.add_account_data_to_room(
+ user_id=user1_id,
+ room_id=room_id2,
+ account_data_type="org.matrix.roorarraz",
+ content={"roo": "rar"},
+ )
+ )
+
+ sync_body = {
+ "lists": {},
+ "room_subscriptions": {
+ room_id1: {
+ "required_state": [],
+ "timeline_limit": 0,
+ }
+ },
+ "extensions": {
+ "account_data": {
+ "enabled": True,
+ "rooms": [room_id1, room_id2],
+ }
+ },
+ }
+ _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+ # Add some other room account data
+ self.get_success(
+ self.account_data_handler.add_account_data_to_room(
+ user_id=user1_id,
+ room_id=room_id1,
+ account_data_type="org.matrix.roorarraz2",
+ content={"roo": "rar"},
+ )
+ )
+ self.get_success(
+ self.account_data_handler.add_account_data_to_room(
+ user_id=user1_id,
+ room_id=room_id2,
+ account_data_type="org.matrix.roorarraz2",
+ content={"roo": "rar"},
+ )
+ )
+
+ # Make an incremental Sliding Sync request with the account_data extension enabled
+ response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+ self.assertIsNotNone(response_body["extensions"]["account_data"].get("global"))
+ # Even though we requested room2, we only expect room1 to show up because that's
+ # the only room in the Sliding Sync response (room2 is not one of our room
+ # subscriptions or in a sliding window list).
+ self.assertIncludes(
+ response_body["extensions"]["account_data"].get("rooms").keys(),
+ {room_id1},
+ exact=True,
+ )
+ # We should only see the new room account data that happened after the `from_token`
+ self.assertIncludes(
+ {
+ event["type"]
+ for event in response_body["extensions"]["account_data"]
+ .get("rooms")
+ .get(room_id1)
+ },
+ {"org.matrix.roorarraz2"},
+ exact=True,
+ )
+
+ def test_room_account_data_relevant_rooms(self) -> None:
+ """
+ Test out different variations of `lists`/`rooms` we are requesting account data for.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ # Create a room and add some room account data
+ room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+ self.get_success(
+ self.account_data_handler.add_account_data_to_room(
+ user_id=user1_id,
+ room_id=room_id1,
+ account_data_type="org.matrix.roorarraz",
+ content={"roo": "rar"},
+ )
+ )
+
+ # Create another room with some room account data
+ room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+ self.get_success(
+ self.account_data_handler.add_account_data_to_room(
+ user_id=user1_id,
+ room_id=room_id2,
+ account_data_type="org.matrix.roorarraz",
+ content={"roo": "rar"},
+ )
+ )
+
+ # Create another room with some room account data
+ room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok)
+ self.get_success(
+ self.account_data_handler.add_account_data_to_room(
+ user_id=user1_id,
+ room_id=room_id3,
+ account_data_type="org.matrix.roorarraz",
+ content={"roo": "rar"},
+ )
+ )
+
+ # Create another room with some room account data
+ room_id4 = self.helper.create_room_as(user1_id, tok=user1_tok)
+ self.get_success(
+ self.account_data_handler.add_account_data_to_room(
+ user_id=user1_id,
+ room_id=room_id4,
+ account_data_type="org.matrix.roorarraz",
+ content={"roo": "rar"},
+ )
+ )
+
+ # Create another room with some room account data
+ room_id5 = self.helper.create_room_as(user1_id, tok=user1_tok)
+ self.get_success(
+ self.account_data_handler.add_account_data_to_room(
+ user_id=user1_id,
+ room_id=room_id5,
+ account_data_type="org.matrix.roorarraz",
+ content={"roo": "rar"},
+ )
+ )
+
+ room_id_to_human_name_map = {
+ room_id1: "room1",
+ room_id2: "room2",
+ room_id3: "room3",
+ room_id4: "room4",
+ room_id5: "room5",
+ }
+
+ # Mix lists and rooms
+ sync_body = {
+ "lists": {
+ # We expect this list range to include room5 and room4
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 0,
+ },
+ # We expect this list range to include room5, room4, room3
+ "bar-list": {
+ "ranges": [[0, 2]],
+ "required_state": [],
+ "timeline_limit": 0,
+ },
+ },
+ "room_subscriptions": {
+ room_id1: {
+ "required_state": [],
+ "timeline_limit": 0,
+ }
+ },
+ "extensions": {
+ "account_data": {
+ "enabled": True,
+ "lists": ["foo-list", "non-existent-list"],
+ "rooms": [room_id1, room_id2, "!non-existent-room"],
+ }
+ },
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+ # room1: ✅ Requested via `rooms` and a room subscription exists
+ # room2: ❌ Requested via `rooms` but not in the response (from lists or room subscriptions)
+ # room3: ❌ Not requested
+ # room4: ✅ Shows up because requested via `lists` and list exists in the response
+ # room5: ✅ Shows up because requested via `lists` and list exists in the response
+ self.assertIncludes(
+ {
+ room_id_to_human_name_map[room_id]
+ for room_id in response_body["extensions"]["account_data"]
+ .get("rooms")
+ .keys()
+ },
+ {"room1", "room4", "room5"},
+ exact=True,
+ )
+
+ # Try wildcards (this is the default)
+ sync_body = {
+ "lists": {
+ # We expect this list range to include room5 and room4
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 0,
+ },
+ # We expect this list range to include room5, room4, room3
+ "bar-list": {
+ "ranges": [[0, 2]],
+ "required_state": [],
+ "timeline_limit": 0,
+ },
+ },
+ "room_subscriptions": {
+ room_id1: {
+ "required_state": [],
+ "timeline_limit": 0,
+ }
+ },
+ "extensions": {
+ "account_data": {
+ "enabled": True,
+ # "lists": ["*"],
+ # "rooms": ["*"],
+ }
+ },
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+ # room1: ✅ Shows up because of default `rooms` wildcard and is in one of the room subscriptions
+ # room2: ❌ Not requested
+ # room3: ✅ Shows up because of default `lists` wildcard and is in a list
+ # room4: ✅ Shows up because of default `lists` wildcard and is in a list
+ # room5: ✅ Shows up because of default `lists` wildcard and is in a list
+ self.assertIncludes(
+ {
+ room_id_to_human_name_map[room_id]
+ for room_id in response_body["extensions"]["account_data"]
+ .get("rooms")
+ .keys()
+ },
+ {"room1", "room3", "room4", "room5"},
+ exact=True,
+ )
+
+ # Empty list will return nothing
+ sync_body = {
+ "lists": {
+ # We expect this list range to include room5 and room4
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 0,
+ },
+ # We expect this list range to include room5, room4, room3
+ "bar-list": {
+ "ranges": [[0, 2]],
+ "required_state": [],
+ "timeline_limit": 0,
+ },
+ },
+ "room_subscriptions": {
+ room_id1: {
+ "required_state": [],
+ "timeline_limit": 0,
+ }
+ },
+ "extensions": {
+ "account_data": {
+ "enabled": True,
+ "lists": [],
+ "rooms": [],
+ }
+ },
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+ # room1: ❌ Not requested
+ # room2: ❌ Not requested
+ # room3: ❌ Not requested
+ # room4: ❌ Not requested
+ # room5: ❌ Not requested
+ self.assertIncludes(
+ {
+ room_id_to_human_name_map[room_id]
+ for room_id in response_body["extensions"]["account_data"]
+ .get("rooms")
+ .keys()
+ },
+ set(),
+ exact=True,
+ )
+
+ # Try wildcard and none
+ sync_body = {
+ "lists": {
+ # We expect this list range to include room5 and room4
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 0,
+ },
+ # We expect this list range to include room5, room4, room3
+ "bar-list": {
+ "ranges": [[0, 2]],
+ "required_state": [],
+ "timeline_limit": 0,
+ },
+ },
+ "room_subscriptions": {
+ room_id1: {
+ "required_state": [],
+ "timeline_limit": 0,
+ }
+ },
+ "extensions": {
+ "account_data": {
+ "enabled": True,
+ "lists": ["*"],
+ "rooms": [],
+ }
+ },
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+ # room1: ❌ Not requested
+ # room2: ❌ Not requested
+ # room3: ✅ Shows up because of default `lists` wildcard and is in a list
+ # room4: ✅ Shows up because of default `lists` wildcard and is in a list
+ # room5: ✅ Shows up because of default `lists` wildcard and is in a list
+ self.assertIncludes(
+ {
+ room_id_to_human_name_map[room_id]
+ for room_id in response_body["extensions"]["account_data"]
+ .get("rooms")
+ .keys()
+ },
+ {"room3", "room4", "room5"},
+ exact=True,
+ )
+
+ def test_wait_for_new_data(self) -> None:
+ """
+ Test to make sure that the Sliding Sync request waits for new data to arrive.
+
+ (Only applies to incremental syncs with a `timeout` specified)
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
+ self.helper.join(room_id, user1_id, tok=user1_tok)
+
+ sync_body = {
+ "lists": {},
+ "extensions": {
+ "account_data": {
+ "enabled": True,
+ }
+ },
+ }
+ _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+ # Make an incremental Sliding Sync request with the account_data extension enabled
channel = self.make_request(
"POST",
- self.sync_endpoint,
+ self.sync_endpoint + f"?timeout=10000&pos={from_token}",
+ content=sync_body,
+ access_token=user1_tok,
+ await_result=False,
+ )
+ # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
+ with self.assertRaises(TimedOutException):
+ channel.await_result(timeout_ms=5000)
+ # Bump the global account data to trigger new results
+ self.get_success(
+ self.account_data_handler.add_account_data_for_user(
+ user1_id,
+ "org.matrix.foobarbaz",
+ {"foo": "bar"},
+ )
+ )
+ # Should respond before the 10 second timeout
+ channel.await_result(timeout_ms=3000)
+ self.assertEqual(channel.code, 200, channel.json_body)
+
+ # We should see the global account data update
+ self.assertIncludes(
{
- "lists": {},
- "extensions": {
- "e2ee": {
- "enabled": True,
- }
- },
+ global_event["type"]
+ for global_event in channel.json_body["extensions"]["account_data"].get(
+ "global"
+ )
+ },
+ {"org.matrix.foobarbaz"},
+ exact=True,
+ )
+ self.assertIncludes(
+ channel.json_body["extensions"]["account_data"].get("rooms").keys(),
+ set(),
+ exact=True,
+ )
+
+ def test_wait_for_new_data_timeout(self) -> None:
+ """
+ Test to make sure that the Sliding Sync request waits for new data to arrive but
+ no data ever arrives so we timeout. We're also making sure that the default data
+ from the account_data extension doesn't trigger a false-positive for new data.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ sync_body = {
+ "lists": {},
+ "extensions": {
+ "account_data": {
+ "enabled": True,
+ }
},
+ }
+ _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+ # Make the Sliding Sync request
+ channel = self.make_request(
+ "POST",
+ self.sync_endpoint + f"?timeout=10000&pos={from_token}",
+ content=sync_body,
access_token=user1_tok,
+ await_result=False,
)
+ # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
+ with self.assertRaises(TimedOutException):
+ channel.await_result(timeout_ms=5000)
+ # Wake-up `notifier.wait_for_events(...)` that will cause us test
+ # `SlidingSyncResult.__bool__` for new results.
+ self._bump_notifier_wait_for_events(
+ user1_id,
+ # We choose `StreamKeyType.PRESENCE` because we're testing for account data
+ # and don't want to contaminate the account data results using
+ # `StreamKeyType.ACCOUNT_DATA`.
+ wake_stream_key=StreamKeyType.PRESENCE,
+ )
+ # Block for a little bit more to ensure we don't see any new results.
+ with self.assertRaises(TimedOutException):
+ channel.await_result(timeout_ms=4000)
+ # Wait for the sync to complete (wait for the rest of the 10 second timeout,
+ # 5000 + 4000 + 1200 > 10000)
+ channel.await_result(timeout_ms=1200)
self.assertEqual(channel.code, 200, channel.json_body)
- # Check for the unused fallback key types
- self.assertListEqual(
- channel.json_body["extensions"]["e2ee"].get(
- "device_unused_fallback_key_types"
- ),
- ["alg1"],
+ self.assertIsNotNone(
+ channel.json_body["extensions"]["account_data"].get("global")
+ )
+ self.assertIsNotNone(
+ channel.json_body["extensions"]["account_data"].get("rooms")
)
|