diff --git a/changelog.d/17481.misc b/changelog.d/17481.misc
new file mode 100644
index 0000000000..ac55538424
--- /dev/null
+++ b/changelog.d/17481.misc
@@ -0,0 +1 @@
+Refactor Sliding Sync tests to better utilize the `SlidingSyncBase`.
diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py
index 135b677bad..a9f2b274aa 100644
--- a/tests/rest/client/test_sync.py
+++ b/tests/rest/client/test_sync.py
@@ -21,7 +21,7 @@
import json
import logging
from http import HTTPStatus
-from typing import Any, Dict, Iterable, List, Optional, Tuple
+from typing import Any, Dict, Iterable, List, Literal, Optional, Tuple
from parameterized import parameterized, parameterized_class
@@ -60,6 +60,7 @@ from synapse.types import (
UserID,
)
from synapse.util import Clock
+from synapse.util.stringutils import random_string
from tests import unittest
from tests.federation.transport.test_knocking import (
@@ -1238,6 +1239,12 @@ class SlidingSyncBase(unittest.HomeserverTestCase):
sync_endpoint = "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
+ def default_config(self) -> JsonDict:
+ config = super().default_config()
+ # Enable sliding sync
+ config["experimental_features"] = {"msc3575_enabled": True}
+ return config
+
def do_sync(
self, sync_body: JsonDict, *, since: Optional[str] = None, tok: str
) -> Tuple[JsonDict, str]:
@@ -1268,6 +1275,88 @@ class SlidingSyncBase(unittest.HomeserverTestCase):
return channel.json_body, channel.json_body["pos"]
+ def _bump_notifier_wait_for_events(
+ self,
+ user_id: str,
+ wake_stream_key: Literal[
+ StreamKeyType.ACCOUNT_DATA,
+ StreamKeyType.PRESENCE,
+ ],
+ ) -> None:
+ """
+ Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding
+ Sync results.
+
+ Args:
+ user_id: The user ID to wake up the notifier for
+ wake_stream_key: The stream key to wake up. This will create an actual new
+ entity in that stream so it's best to choose one that won't affect the
+ Sliding Sync results you're testing for. In other words, if your testing
+ account data, choose `StreamKeyType.PRESENCE` instead. We support two
+ possible stream keys because you're probably testing one or the other so
+ one is always a "safe" option.
+ """
+ # We're expecting some new activity from this point onwards
+ from_token = self.hs.get_event_sources().get_current_token()
+
+ triggered_notifier_wait_for_events = False
+
+ async def _on_new_acivity(
+ before_token: StreamToken, after_token: StreamToken
+ ) -> bool:
+ nonlocal triggered_notifier_wait_for_events
+ triggered_notifier_wait_for_events = True
+ return True
+
+ notifier = self.hs.get_notifier()
+
+ # Listen for some new activity for the user. We're just trying to confirm that
+ # our bump below actually does what we think it does (triggers new activity for
+ # the user).
+ result_awaitable = notifier.wait_for_events(
+ user_id,
+ 1000,
+ _on_new_acivity,
+ from_token=from_token,
+ )
+
+ # Update the account data or presence so that `notifier.wait_for_events(...)`
+ # wakes up. We chose these two options because they're least likely to show up
+ # in the Sliding Sync response so it won't affect whether we have results.
+ if wake_stream_key == StreamKeyType.ACCOUNT_DATA:
+ self.get_success(
+ self.hs.get_account_data_handler().add_account_data_for_user(
+ user_id,
+ "org.matrix.foobarbaz",
+ {"foo": "bar"},
+ )
+ )
+ elif wake_stream_key == StreamKeyType.PRESENCE:
+ sending_user_id = self.register_user(
+ "user_bump_notifier_wait_for_events_" + random_string(10), "pass"
+ )
+ sending_user_tok = self.login(sending_user_id, "pass")
+ test_msg = {"foo": "bar"}
+ chan = self.make_request(
+ "PUT",
+ "/_matrix/client/r0/sendToDevice/m.test/1234",
+ content={"messages": {user_id: {"d1": test_msg}}},
+ access_token=sending_user_tok,
+ )
+ self.assertEqual(chan.code, 200, chan.result)
+ else:
+ raise AssertionError(
+ "Unable to wake that stream in _bump_notifier_wait_for_events(...)"
+ )
+
+ # Wait for our notifier result
+ self.get_success(result_awaitable)
+
+ if not triggered_notifier_wait_for_events:
+ raise AssertionError(
+ "Expected `notifier.wait_for_events(...)` to be triggered"
+ )
+
class SlidingSyncTestCase(SlidingSyncBase):
"""
@@ -1282,18 +1371,10 @@ class SlidingSyncTestCase(SlidingSyncBase):
devices.register_servlets,
]
- def default_config(self) -> JsonDict:
- config = super().default_config()
- # Enable sliding sync
- config["experimental_features"] = {"msc3575_enabled": True}
- return config
-
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.event_sources = hs.get_event_sources()
self.storage_controllers = hs.get_storage_controllers()
- self.account_data_handler = hs.get_account_data_handler()
- self.notifier = hs.get_notifier()
def _assertRequiredStateIncludes(
self,
@@ -1419,52 +1500,6 @@ class SlidingSyncTestCase(SlidingSyncBase):
return room_id
- def _bump_notifier_wait_for_events(self, user_id: str) -> None:
- """
- Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding
- Sync results.
- """
- # We're expecting some new activity from this point onwards
- from_token = self.event_sources.get_current_token()
-
- triggered_notifier_wait_for_events = False
-
- async def _on_new_acivity(
- before_token: StreamToken, after_token: StreamToken
- ) -> bool:
- nonlocal triggered_notifier_wait_for_events
- triggered_notifier_wait_for_events = True
- return True
-
- # Listen for some new activity for the user. We're just trying to confirm that
- # our bump below actually does what we think it does (triggers new activity for
- # the user).
- result_awaitable = self.notifier.wait_for_events(
- user_id,
- 1000,
- _on_new_acivity,
- from_token=from_token,
- )
-
- # Update the account data so that `notifier.wait_for_events(...)` wakes up.
- # We're bumping account data because it won't show up in the Sliding Sync
- # response so it won't affect whether we have results.
- self.get_success(
- self.account_data_handler.add_account_data_for_user(
- user_id,
- "org.matrix.foobarbaz",
- {"foo": "bar"},
- )
- )
-
- # Wait for our notifier result
- self.get_success(result_awaitable)
-
- if not triggered_notifier_wait_for_events:
- raise AssertionError(
- "Expected `notifier.wait_for_events(...)` to be triggered"
- )
-
def test_sync_list(self) -> None:
"""
Test that room IDs show up in the Sliding Sync `lists`
@@ -1671,7 +1706,9 @@ class SlidingSyncTestCase(SlidingSyncBase):
channel.await_result(timeout_ms=5000)
# Wake-up `notifier.wait_for_events(...)` that will cause us test
# `SlidingSyncResult.__bool__` for new results.
- self._bump_notifier_wait_for_events(user1_id)
+ self._bump_notifier_wait_for_events(
+ user1_id, wake_stream_key=StreamKeyType.ACCOUNT_DATA
+ )
# Block for a little bit more to ensure we don't see any new results.
with self.assertRaises(TimedOutException):
channel.await_result(timeout_ms=4000)
@@ -4636,67 +4673,12 @@ class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
sendtodevice.register_servlets,
]
- def default_config(self) -> JsonDict:
- config = super().default_config()
- # Enable sliding sync
- config["experimental_features"] = {"msc3575_enabled": True}
- return config
-
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
- self.event_sources = hs.get_event_sources()
- self.account_data_handler = hs.get_account_data_handler()
- self.notifier = hs.get_notifier()
self.sync_endpoint = (
"/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
)
- def _bump_notifier_wait_for_events(self, user_id: str) -> None:
- """
- Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding
- Sync results.
- """
- # We're expecting some new activity from this point onwards
- from_token = self.event_sources.get_current_token()
-
- triggered_notifier_wait_for_events = False
-
- async def _on_new_acivity(
- before_token: StreamToken, after_token: StreamToken
- ) -> bool:
- nonlocal triggered_notifier_wait_for_events
- triggered_notifier_wait_for_events = True
- return True
-
- # Listen for some new activity for the user. We're just trying to confirm that
- # our bump below actually does what we think it does (triggers new activity for
- # the user).
- result_awaitable = self.notifier.wait_for_events(
- user_id,
- 1000,
- _on_new_acivity,
- from_token=from_token,
- )
-
- # Update the account data so that `notifier.wait_for_events(...)` wakes up.
- # We're bumping account data because it won't show up in the Sliding Sync
- # response so it won't affect whether we have results.
- self.get_success(
- self.account_data_handler.add_account_data_for_user(
- user_id,
- "org.matrix.foobarbaz",
- {"foo": "bar"},
- )
- )
-
- # Wait for our notifier result
- self.get_success(result_awaitable)
-
- if not triggered_notifier_wait_for_events:
- raise AssertionError(
- "Expected `notifier.wait_for_events(...)` to be triggered"
- )
-
def _assert_to_device_response(
self, channel: FakeChannel, expected_messages: List[JsonDict]
) -> str:
@@ -4945,7 +4927,9 @@ class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
channel.await_result(timeout_ms=5000)
# Wake-up `notifier.wait_for_events(...)` that will cause us test
# `SlidingSyncResult.__bool__` for new results.
- self._bump_notifier_wait_for_events(user1_id)
+ self._bump_notifier_wait_for_events(
+ user1_id, wake_stream_key=StreamKeyType.ACCOUNT_DATA
+ )
# Block for a little bit more to ensure we don't see any new results.
with self.assertRaises(TimedOutException):
channel.await_result(timeout_ms=4000)
@@ -4968,68 +4952,13 @@ class SlidingSyncE2eeExtensionTestCase(SlidingSyncBase):
devices.register_servlets,
]
- def default_config(self) -> JsonDict:
- config = super().default_config()
- # Enable sliding sync
- config["experimental_features"] = {"msc3575_enabled": True}
- return config
-
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
- self.event_sources = hs.get_event_sources()
self.e2e_keys_handler = hs.get_e2e_keys_handler()
- self.account_data_handler = hs.get_account_data_handler()
- self.notifier = hs.get_notifier()
self.sync_endpoint = (
"/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
)
- def _bump_notifier_wait_for_events(self, user_id: str) -> None:
- """
- Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding
- Sync results.
- """
- # We're expecting some new activity from this point onwards
- from_token = self.event_sources.get_current_token()
-
- triggered_notifier_wait_for_events = False
-
- async def _on_new_acivity(
- before_token: StreamToken, after_token: StreamToken
- ) -> bool:
- nonlocal triggered_notifier_wait_for_events
- triggered_notifier_wait_for_events = True
- return True
-
- # Listen for some new activity for the user. We're just trying to confirm that
- # our bump below actually does what we think it does (triggers new activity for
- # the user).
- result_awaitable = self.notifier.wait_for_events(
- user_id,
- 1000,
- _on_new_acivity,
- from_token=from_token,
- )
-
- # Update the account data so that `notifier.wait_for_events(...)` wakes up.
- # We're bumping account data because it won't show up in the Sliding Sync
- # response so it won't affect whether we have results.
- self.get_success(
- self.account_data_handler.add_account_data_for_user(
- user_id,
- "org.matrix.foobarbaz",
- {"foo": "bar"},
- )
- )
-
- # Wait for our notifier result
- self.get_success(result_awaitable)
-
- if not triggered_notifier_wait_for_events:
- raise AssertionError(
- "Expected `notifier.wait_for_events(...)` to be triggered"
- )
-
def test_no_data_initial_sync(self) -> None:
"""
Test that enabling e2ee extension works during an intitial sync, even if there
@@ -5231,7 +5160,9 @@ class SlidingSyncE2eeExtensionTestCase(SlidingSyncBase):
channel.await_result(timeout_ms=5000)
# Wake-up `notifier.wait_for_events(...)` that will cause us test
# `SlidingSyncResult.__bool__` for new results.
- self._bump_notifier_wait_for_events(user1_id)
+ self._bump_notifier_wait_for_events(
+ user1_id, wake_stream_key=StreamKeyType.ACCOUNT_DATA
+ )
# Block for a little bit more to ensure we don't see any new results.
with self.assertRaises(TimedOutException):
channel.await_result(timeout_ms=4000)
@@ -5471,73 +5402,14 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
sendtodevice.register_servlets,
]
- def default_config(self) -> JsonDict:
- config = super().default_config()
- # Enable sliding sync
- config["experimental_features"] = {"msc3575_enabled": True}
- return config
-
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
- self.event_sources = hs.get_event_sources()
self.e2e_keys_handler = hs.get_e2e_keys_handler()
self.account_data_handler = hs.get_account_data_handler()
- self.notifier = hs.get_notifier()
self.sync_endpoint = (
"/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
)
- def _bump_notifier_wait_for_events(self, user_id: str) -> None:
- """
- Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding
- Sync results.
- """
- # We're expecting some new activity from this point onwards
- from_token = self.event_sources.get_current_token()
-
- triggered_notifier_wait_for_events = False
-
- async def _on_new_acivity(
- before_token: StreamToken, after_token: StreamToken
- ) -> bool:
- nonlocal triggered_notifier_wait_for_events
- triggered_notifier_wait_for_events = True
- return True
-
- # Listen for some new activity for the user. We're just trying to confirm that
- # our bump below actually does what we think it does (triggers new activity for
- # the user).
- result_awaitable = self.notifier.wait_for_events(
- user_id,
- 1000,
- _on_new_acivity,
- from_token=from_token,
- )
-
- # Send a new To-Device message so that `notifier.wait_for_events(...)` wakes up.
- # We're bumping to-device because it won't show up in the Sliding Sync response
- # for this extension so it won't affect whether we have results.
- sending_user_id = self.register_user(
- "user_bump_notifier_wait_for_events", "pass"
- )
- sending_user_tok = self.login(sending_user_id, "pass")
- test_msg = {"foo": "bar"}
- chan = self.make_request(
- "PUT",
- "/_matrix/client/r0/sendToDevice/m.test/1234",
- content={"messages": {user_id: {"d1": test_msg}}},
- access_token=sending_user_tok,
- )
- self.assertEqual(chan.code, 200, chan.result)
-
- # Wait for our notifier result
- self.get_success(result_awaitable)
-
- if not triggered_notifier_wait_for_events:
- raise AssertionError(
- "Expected `notifier.wait_for_events(...)` to be triggered"
- )
-
def test_no_data_initial_sync(self) -> None:
"""
Test that enabling the account_data extension works during an intitial sync,
@@ -6229,7 +6101,13 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
channel.await_result(timeout_ms=5000)
# Wake-up `notifier.wait_for_events(...)` that will cause us test
# `SlidingSyncResult.__bool__` for new results.
- self._bump_notifier_wait_for_events(user1_id)
+ self._bump_notifier_wait_for_events(
+ user1_id,
+ # We choose `StreamKeyType.PRESENCE` because we're testing for account data
+ # and don't want to contaminate the account data results using
+ # `StreamKeyType.ACCOUNT_DATA`.
+ wake_stream_key=StreamKeyType.PRESENCE,
+ )
# Block for a little bit more to ensure we don't see any new results.
with self.assertRaises(TimedOutException):
channel.await_result(timeout_ms=4000)
|