diff options
-rw-r--r-- | changelog.d/9823.misc | 1 | ||||
-rw-r--r-- | docs/presence_router_module.md | 6 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 136 | ||||
-rw-r--r-- | synapse/module_api/__init__.py | 63 | ||||
-rw-r--r-- | synapse/replication/http/presence.py | 11 | ||||
-rw-r--r-- | synapse/rest/admin/server_notice_servlet.py | 8 | ||||
-rw-r--r-- | synapse/storage/databases/main/presence.py | 58 | ||||
-rw-r--r-- | synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql | 34 | ||||
-rw-r--r-- | tests/events/test_presence_router.py | 15 | ||||
-rw-r--r-- | tests/module_api/test_api.py | 303 | ||||
-rw-r--r-- | tests/replication/test_sharded_event_persister.py | 2 |
11 files changed, 479 insertions, 158 deletions
diff --git a/changelog.d/9823.misc b/changelog.d/9823.misc new file mode 100644 index 0000000000..bf924ab68c --- /dev/null +++ b/changelog.d/9823.misc @@ -0,0 +1 @@ +Allow sending full presence to users via workers other than the one that called `ModuleApi.send_local_online_presence_to`. \ No newline at end of file diff --git a/docs/presence_router_module.md b/docs/presence_router_module.md index d6566d978d..d2844915df 100644 --- a/docs/presence_router_module.md +++ b/docs/presence_router_module.md @@ -28,7 +28,11 @@ async def ModuleApi.send_local_online_presence_to(users: Iterable[str]) -> None which can be given a list of local or remote MXIDs to broadcast known, online user presence to (for those users that the receiving user is considered interested in). It does not include state for users who are currently offline, and it can only be -called on workers that support sending federation. +called on workers that support sending federation. Additionally, this method must +only be called from the process that has been configured to write to the +the [presence stream](https://github.com/matrix-org/synapse/blob/master/docs/workers.md#stream-writers). +By default, this is the main process, but another worker can be configured to do +so. ### Module structure diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 6fd1f34289..f5a049d754 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -222,9 +222,21 @@ class BasePresenceHandler(abc.ABC): @abc.abstractmethod async def set_state( - self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False + self, + target_user: UserID, + state: JsonDict, + ignore_status_msg: bool = False, + force_notify: bool = False, ) -> None: - """Set the presence state of the user. """ + """Set the presence state of the user. + + Args: + target_user: The ID of the user to set the presence state of. + state: The presence state as a JSON dictionary. + ignore_status_msg: True to ignore the "status_msg" field of the `state` dict. + If False, the user's current status will be updated. + force_notify: Whether to force notification of the update to clients. + """ @abc.abstractmethod async def bump_presence_active_time(self, user: UserID): @@ -296,6 +308,51 @@ class BasePresenceHandler(abc.ABC): for destinations, states in hosts_and_states: self._federation.send_presence_to_destinations(states, destinations) + async def send_full_presence_to_users(self, user_ids: Collection[str]): + """ + Adds to the list of users who should receive a full snapshot of presence + upon their next sync. Note that this only works for local users. + + Then, grabs the current presence state for a given set of users and adds it + to the top of the presence stream. + + Args: + user_ids: The IDs of the local users to send full presence to. + """ + # Retrieve one of the users from the given set + if not user_ids: + raise Exception( + "send_full_presence_to_users must be called with at least one user" + ) + user_id = next(iter(user_ids)) + + # Mark all users as receiving full presence on their next sync + await self.store.add_users_to_send_full_presence_to(user_ids) + + # Add a new entry to the presence stream. Since we use stream tokens to determine whether a + # local user should receive a full snapshot of presence when they sync, we need to bump the + # presence stream so that subsequent syncs with no presence activity in between won't result + # in the client receiving multiple full snapshots of presence. + # + # If we bump the stream ID, then the user will get a higher stream token next sync, and thus + # correctly won't receive a second snapshot. + + # Get the current presence state for one of the users (defaults to offline if not found) + current_presence_state = await self.get_state(UserID.from_string(user_id)) + + # Convert the UserPresenceState object into a serializable dict + state = { + "presence": current_presence_state.state, + "status_message": current_presence_state.status_msg, + } + + # Copy the presence state to the tip of the presence stream. + + # We set force_notify=True here so that this presence update is guaranteed to + # increment the presence stream ID (which resending the current user's presence + # otherwise would not do). + await self.set_state(UserID.from_string(user_id), state, force_notify=True) + class _NullContextManager(ContextManager[None]): """A context manager which does nothing.""" @@ -480,8 +537,17 @@ class WorkerPresenceHandler(BasePresenceHandler): target_user: UserID, state: JsonDict, ignore_status_msg: bool = False, + force_notify: bool = False, ) -> None: - """Set the presence state of the user.""" + """Set the presence state of the user. + + Args: + target_user: The ID of the user to set the presence state of. + state: The presence state as a JSON dictionary. + ignore_status_msg: True to ignore the "status_msg" field of the `state` dict. + If False, the user's current status will be updated. + force_notify: Whether to force notification of the update to clients. + """ presence = state["presence"] valid_presence = ( @@ -508,6 +574,7 @@ class WorkerPresenceHandler(BasePresenceHandler): user_id=user_id, state=state, ignore_status_msg=ignore_status_msg, + force_notify=force_notify, ) async def bump_presence_active_time(self, user: UserID) -> None: @@ -677,13 +744,19 @@ class PresenceHandler(BasePresenceHandler): [self.user_to_current_state[user_id] for user_id in unpersisted] ) - async def _update_states(self, new_states: Iterable[UserPresenceState]) -> None: + async def _update_states( + self, new_states: Iterable[UserPresenceState], force_notify: bool = False + ) -> None: """Updates presence of users. Sets the appropriate timeouts. Pokes the notifier and federation if and only if the changed presence state should be sent to clients/servers. Args: new_states: The new user presence state updates to process. + force_notify: Whether to force notifying clients of this presence state update, + even if it doesn't change the state of a user's presence (e.g online -> online). + This is currently used to bump the max presence stream ID without changing any + user's presence (see PresenceHandler.add_users_to_send_full_presence_to). """ now = self.clock.time_msec() @@ -720,6 +793,9 @@ class PresenceHandler(BasePresenceHandler): now=now, ) + if force_notify: + should_notify = True + self.user_to_current_state[user_id] = new_state if should_notify: @@ -1058,9 +1134,21 @@ class PresenceHandler(BasePresenceHandler): await self._update_states(updates) async def set_state( - self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False + self, + target_user: UserID, + state: JsonDict, + ignore_status_msg: bool = False, + force_notify: bool = False, ) -> None: - """Set the presence state of the user.""" + """Set the presence state of the user. + + Args: + target_user: The ID of the user to set the presence state of. + state: The presence state as a JSON dictionary. + ignore_status_msg: True to ignore the "status_msg" field of the `state` dict. + If False, the user's current status will be updated. + force_notify: Whether to force notification of the update to clients. + """ status_msg = state.get("status_msg", None) presence = state["presence"] @@ -1091,7 +1179,9 @@ class PresenceHandler(BasePresenceHandler): ): new_fields["last_active_ts"] = self.clock.time_msec() - await self._update_states([prev_state.copy_and_replace(**new_fields)]) + await self._update_states( + [prev_state.copy_and_replace(**new_fields)], force_notify=force_notify + ) async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool: """Returns whether a user can see another user's presence.""" @@ -1389,11 +1479,10 @@ class PresenceEventSource: # # Presence -> Notifier -> PresenceEventSource -> Presence # - # Same with get_module_api, get_presence_router + # Same with get_presence_router: # # AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler self.get_presence_handler = hs.get_presence_handler - self.get_module_api = hs.get_module_api self.get_presence_router = hs.get_presence_router self.clock = hs.get_clock() self.store = hs.get_datastore() @@ -1424,16 +1513,21 @@ class PresenceEventSource: stream_change_cache = self.store.presence_stream_cache with Measure(self.clock, "presence.get_new_events"): - if user_id in self.get_module_api()._send_full_presence_to_local_users: - # This user has been specified by a module to receive all current, online - # user presence. Removing from_key and setting include_offline to false - # will do effectively this. - from_key = None - include_offline = False - if from_key is not None: from_key = int(from_key) + # Check if this user should receive all current, online user presence. We only + # bother to do this if from_key is set, as otherwise the user will receive all + # user presence anyways. + if await self.store.should_user_receive_full_presence_with_token( + user_id, from_key + ): + # This user has been specified by a module to receive all current, online + # user presence. Removing from_key and setting include_offline to false + # will do effectively this. + from_key = None + include_offline = False + max_token = self.store.get_current_presence_token() if from_key == max_token: # This is necessary as due to the way stream ID generators work @@ -1467,12 +1561,6 @@ class PresenceEventSource: user_id, include_offline, from_key ) - # Remove the user from the list of users to receive all presence - if user_id in self.get_module_api()._send_full_presence_to_local_users: - self.get_module_api()._send_full_presence_to_local_users.remove( - user_id - ) - return presence_updates, max_token # Make mypy happy. users_interested_in should now be a set @@ -1522,10 +1610,6 @@ class PresenceEventSource: ) presence_updates = list(users_to_state.values()) - # Remove the user from the list of users to receive all presence - if user_id in self.get_module_api()._send_full_presence_to_local_users: - self.get_module_api()._send_full_presence_to_local_users.remove(user_id) - if not include_offline: # Filter out offline presence states presence_updates = self._filter_offline_presence_state(presence_updates) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index a1a2b9aecc..cecdc96bf5 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -56,14 +56,6 @@ class ModuleApi: self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient self._public_room_list_manager = PublicRoomListManager(hs) - # The next time these users sync, they will receive the current presence - # state of all local users. Users are added by send_local_online_presence_to, - # and removed after a successful sync. - # - # We make this a private variable to deter modules from accessing it directly, - # though other classes in Synapse will still do so. - self._send_full_presence_to_local_users = set() - @property def http_client(self): """Allows making outbound HTTP requests to remote resources. @@ -405,39 +397,44 @@ class ModuleApi: Updates to remote users will be sent immediately, whereas local users will receive them on their next sync attempt. - Note that this method can only be run on the main or federation_sender worker - processes. + Note that this method can only be run on the process that is configured to write to the + presence stream. By default this is the main process. """ - if not self._hs.should_send_federation(): + if self._hs._instance_name not in self._hs.config.worker.writers.presence: raise Exception( "send_local_online_presence_to can only be run " - "on processes that send federation", + "on the process that is configured to write to the " + "presence stream (by default this is the main process)", ) + local_users = set() + remote_users = set() for user in users: if self._hs.is_mine_id(user): - # Modify SyncHandler._generate_sync_entry_for_presence to call - # presence_source.get_new_events with an empty `from_key` if - # that user's ID were in a list modified by ModuleApi somewhere. - # That user would then get all presence state on next incremental sync. - - # Force a presence initial_sync for this user next time - self._send_full_presence_to_local_users.add(user) + local_users.add(user) else: - # Retrieve presence state for currently online users that this user - # is considered interested in - presence_events, _ = await self._presence_stream.get_new_events( - UserID.from_string(user), from_key=None, include_offline=False - ) - - # Send to remote destinations. - - # We pull out the presence handler here to break a cyclic - # dependency between the presence router and module API. - presence_handler = self._hs.get_presence_handler() - await presence_handler.maybe_send_presence_to_interested_destinations( - presence_events - ) + remote_users.add(user) + + # We pull out the presence handler here to break a cyclic + # dependency between the presence router and module API. + presence_handler = self._hs.get_presence_handler() + + if local_users: + # Force a presence initial_sync for these users next time they sync. + await presence_handler.send_full_presence_to_users(local_users) + + for user in remote_users: + # Retrieve presence state for currently online users that this user + # is considered interested in. + presence_events, _ = await self._presence_stream.get_new_events( + UserID.from_string(user), from_key=None, include_offline=False + ) + + # Send to remote destinations. + destination = UserID.from_string(user).domain + presence_handler.get_federation_queue().send_presence_to_destinations( + presence_events, destination + ) class PublicRoomListManager: diff --git a/synapse/replication/http/presence.py b/synapse/replication/http/presence.py index f25307620d..bb00247953 100644 --- a/synapse/replication/http/presence.py +++ b/synapse/replication/http/presence.py @@ -73,6 +73,7 @@ class ReplicationPresenceSetState(ReplicationEndpoint): { "state": { ... }, "ignore_status_msg": false, + "force_notify": false } 200 OK @@ -91,17 +92,23 @@ class ReplicationPresenceSetState(ReplicationEndpoint): self._presence_handler = hs.get_presence_handler() @staticmethod - async def _serialize_payload(user_id, state, ignore_status_msg=False): + async def _serialize_payload( + user_id, state, ignore_status_msg=False, force_notify=False + ): return { "state": state, "ignore_status_msg": ignore_status_msg, + "force_notify": force_notify, } async def _handle_request(self, request, user_id): content = parse_json_object_from_request(request) await self._presence_handler.set_state( - UserID.from_string(user_id), content["state"], content["ignore_status_msg"] + UserID.from_string(user_id), + content["state"], + content["ignore_status_msg"], + content["force_notify"], ) return ( diff --git a/synapse/rest/admin/server_notice_servlet.py b/synapse/rest/admin/server_notice_servlet.py index cc3ab5854b..b5e4c474ef 100644 --- a/synapse/rest/admin/server_notice_servlet.py +++ b/synapse/rest/admin/server_notice_servlet.py @@ -54,7 +54,6 @@ class SendServerNoticeServlet(RestServlet): self.hs = hs self.auth = hs.get_auth() self.txns = HttpTransactionCache(hs) - self.snm = hs.get_server_notices_manager() def register(self, json_resource: HttpServer): PATTERN = "/send_server_notice" @@ -77,7 +76,10 @@ class SendServerNoticeServlet(RestServlet): event_type = body.get("type", EventTypes.Message) state_key = body.get("state_key") - if not self.snm.is_enabled(): + # We grab the server notices manager here as its initialisation has a check for worker processes, + # but worker processes still need to initialise SendServerNoticeServlet (as it is part of the + # admin api). + if not self.hs.get_server_notices_manager().is_enabled(): raise SynapseError(400, "Server notices are not enabled on this server") user_id = body["user_id"] @@ -85,7 +87,7 @@ class SendServerNoticeServlet(RestServlet): if not self.hs.is_mine_id(user_id): raise SynapseError(400, "Server notices can only be sent to local users") - event = await self.snm.send_notice( + event = await self.hs.get_server_notices_manager().send_notice( user_id=body["user_id"], type=event_type, state_key=state_key, diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index db22fab23e..669a2af884 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, Dict, List, Tuple +from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple from synapse.api.presence import PresenceState, UserPresenceState from synapse.replication.tcp.streams import PresenceStream @@ -57,6 +57,7 @@ class PresenceStore(SQLBaseStore): db_conn, "presence_stream", "stream_id" ) + self.hs = hs self._presence_on_startup = self._get_active_presence(db_conn) presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict( @@ -210,6 +211,61 @@ class PresenceStore(SQLBaseStore): return {row["user_id"]: UserPresenceState(**row) for row in rows} + async def should_user_receive_full_presence_with_token( + self, + user_id: str, + from_token: int, + ) -> bool: + """Check whether the given user should receive full presence using the stream token + they're updating from. + + Args: + user_id: The ID of the user to check. + from_token: The stream token included in their /sync token. + + Returns: + True if the user should have full presence sent to them, False otherwise. + """ + + def _should_user_receive_full_presence_with_token_txn(txn): + sql = """ + SELECT 1 FROM users_to_send_full_presence_to + WHERE user_id = ? + AND presence_stream_id >= ? + """ + txn.execute(sql, (user_id, from_token)) + return bool(txn.fetchone()) + + return await self.db_pool.runInteraction( + "should_user_receive_full_presence_with_token", + _should_user_receive_full_presence_with_token_txn, + ) + + async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]): + """Adds to the list of users who should receive a full snapshot of presence + upon their next sync. + + Args: + user_ids: An iterable of user IDs. + """ + # Add user entries to the table, updating the presence_stream_id column if the user already + # exists in the table. + await self.db_pool.simple_upsert_many( + table="users_to_send_full_presence_to", + key_names=("user_id",), + key_values=[(user_id,) for user_id in user_ids], + value_names=("presence_stream_id",), + # We save the current presence stream ID token along with the user ID entry so + # that when a user /sync's, even if they syncing multiple times across separate + # devices at different times, each device will receive full presence once - when + # the presence stream ID in their sync token is less than the one in the table + # for their user ID. + value_values=( + (self._presence_id_gen.get_current_token(),) for _ in user_ids + ), + desc="add_users_to_send_full_presence_to", + ) + async def get_presence_for_all_users( self, include_offline: bool = True, diff --git a/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql b/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql new file mode 100644 index 0000000000..07b0f53ecf --- /dev/null +++ b/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql @@ -0,0 +1,34 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Add a table that keeps track of a list of users who should, upon their next +-- sync request, receive presence for all currently online users that they are +-- "interested" in. + +-- The motivation for a DB table over an in-memory list is so that this list +-- can be added to and retrieved from by any worker. Specifically, we don't +-- want to duplicate work across multiple sync workers. + +CREATE TABLE IF NOT EXISTS users_to_send_full_presence_to( + -- The user ID to send full presence to. + user_id TEXT PRIMARY KEY, + -- A presence stream ID token - the current presence stream token when the row was last upserted. + -- If a user calls /sync and this token is part of the update they're to receive, we also include + -- full user presence in the response. + -- This allows multiple devices for a user to receive full presence whenever they next call /sync. + presence_stream_id BIGINT, + FOREIGN KEY (user_id) + REFERENCES users (name) +); \ No newline at end of file diff --git a/tests/events/test_presence_router.py b/tests/events/test_presence_router.py index 01d257307c..875b0d0a11 100644 --- a/tests/events/test_presence_router.py +++ b/tests/events/test_presence_router.py @@ -302,11 +302,18 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase): ) # Check that the expected presence updates were sent - expected_users = [ + # We explicitly compare using sets as we expect that calling + # module_api.send_local_online_presence_to will create a presence + # update that is a duplicate of the specified user's current presence. + # These are sent to clients and will be picked up below, thus we use a + # set to deduplicate. We're just interested that non-offline updates were + # sent out for each user ID. + expected_users = { self.other_user_id, self.presence_receiving_user_one_id, self.presence_receiving_user_two_id, - ] + } + found_users = set() calls = ( self.hs.get_federation_transport_client().send_transaction.call_args_list @@ -326,12 +333,12 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase): # EDUs can contain multiple presence updates for presence_update in edu["content"]["push"]: # Check for presence updates that contain the user IDs we're after - expected_users.remove(presence_update["user_id"]) + found_users.add(presence_update["user_id"]) # Ensure that no offline states are being sent out self.assertNotEqual(presence_update["presence"], "offline") - self.assertEqual(len(expected_users), 0) + self.assertEqual(found_users, expected_users) def send_presence_update( diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py index 742ad14b8c..2c68b9a13c 100644 --- a/tests/module_api/test_api.py +++ b/tests/module_api/test_api.py @@ -13,6 +13,8 @@ # limitations under the License. from unittest.mock import Mock +from twisted.internet import defer + from synapse.api.constants import EduTypes from synapse.events import EventBase from synapse.federation.units import Transaction @@ -22,11 +24,13 @@ from synapse.rest.client.v1 import login, presence, room from synapse.types import create_requester from tests.events.test_presence_router import send_presence_update, sync_presence +from tests.replication._base import BaseMultiWorkerStreamTestCase from tests.test_utils.event_injection import inject_member_event -from tests.unittest import FederatingHomeserverTestCase, override_config +from tests.unittest import HomeserverTestCase, override_config +from tests.utils import USE_POSTGRES_FOR_TESTS -class ModuleApiTestCase(FederatingHomeserverTestCase): +class ModuleApiTestCase(HomeserverTestCase): servlets = [ admin.register_servlets, login.register_servlets, @@ -217,97 +221,16 @@ class ModuleApiTestCase(FederatingHomeserverTestCase): ) self.assertFalse(is_in_public_rooms) - # The ability to send federation is required by send_local_online_presence_to. - @override_config({"send_federation": True}) def test_send_local_online_presence_to(self): - """Tests that send_local_presence_to_users sends local online presence to local users.""" - # Create a user who will send presence updates - self.presence_receiver_id = self.register_user("presence_receiver", "monkey") - self.presence_receiver_tok = self.login("presence_receiver", "monkey") - - # And another user that will send presence updates out - self.presence_sender_id = self.register_user("presence_sender", "monkey") - self.presence_sender_tok = self.login("presence_sender", "monkey") - - # Put them in a room together so they will receive each other's presence updates - room_id = self.helper.create_room_as( - self.presence_receiver_id, - tok=self.presence_receiver_tok, - ) - self.helper.join(room_id, self.presence_sender_id, tok=self.presence_sender_tok) - - # Presence sender comes online - send_presence_update( - self, - self.presence_sender_id, - self.presence_sender_tok, - "online", - "I'm online!", - ) - - # Presence receiver should have received it - presence_updates, sync_token = sync_presence(self, self.presence_receiver_id) - self.assertEqual(len(presence_updates), 1) - - presence_update = presence_updates[0] # type: UserPresenceState - self.assertEqual(presence_update.user_id, self.presence_sender_id) - self.assertEqual(presence_update.state, "online") - - # Syncing again should result in no presence updates - presence_updates, sync_token = sync_presence( - self, self.presence_receiver_id, sync_token - ) - self.assertEqual(len(presence_updates), 0) - - # Trigger sending local online presence - self.get_success( - self.module_api.send_local_online_presence_to( - [ - self.presence_receiver_id, - ] - ) - ) - - # Presence receiver should have received online presence again - presence_updates, sync_token = sync_presence( - self, self.presence_receiver_id, sync_token - ) - self.assertEqual(len(presence_updates), 1) - - presence_update = presence_updates[0] # type: UserPresenceState - self.assertEqual(presence_update.user_id, self.presence_sender_id) - self.assertEqual(presence_update.state, "online") - - # Presence sender goes offline - send_presence_update( - self, - self.presence_sender_id, - self.presence_sender_tok, - "offline", - "I slink back into the darkness.", - ) - - # Trigger sending local online presence - self.get_success( - self.module_api.send_local_online_presence_to( - [ - self.presence_receiver_id, - ] - ) - ) - - # Presence receiver should *not* have received offline state - presence_updates, sync_token = sync_presence( - self, self.presence_receiver_id, sync_token - ) - self.assertEqual(len(presence_updates), 0) + # Test sending local online presence to users from the main process + _test_sending_local_online_presence_to_local_user(self, test_with_workers=False) @override_config({"send_federation": True}) def test_send_local_online_presence_to_federation(self): """Tests that send_local_presence_to_users sends local online presence to remote users.""" # Create a user who will send presence updates - self.presence_sender_id = self.register_user("presence_sender", "monkey") - self.presence_sender_tok = self.login("presence_sender", "monkey") + self.presence_sender_id = self.register_user("presence_sender1", "monkey") + self.presence_sender_tok = self.login("presence_sender1", "monkey") # And a room they're a part of room_id = self.helper.create_room_as( @@ -374,3 +297,209 @@ class ModuleApiTestCase(FederatingHomeserverTestCase): found_update = True self.assertTrue(found_update) + + +class ModuleApiWorkerTestCase(BaseMultiWorkerStreamTestCase): + """For testing ModuleApi functionality in a multi-worker setup""" + + # Testing stream ID replication from the main to worker processes requires postgres + # (due to needing `MultiWriterIdGenerator`). + if not USE_POSTGRES_FOR_TESTS: + skip = "Requires Postgres" + + servlets = [ + admin.register_servlets, + login.register_servlets, + room.register_servlets, + presence.register_servlets, + ] + + def default_config(self): + conf = super().default_config() + conf["redis"] = {"enabled": "true"} + conf["stream_writers"] = {"presence": ["presence_writer"]} + conf["instance_map"] = { + "presence_writer": {"host": "testserv", "port": 1001}, + } + return conf + + def prepare(self, reactor, clock, homeserver): + self.module_api = homeserver.get_module_api() + self.sync_handler = homeserver.get_sync_handler() + + def test_send_local_online_presence_to_workers(self): + # Test sending local online presence to users from a worker process + _test_sending_local_online_presence_to_local_user(self, test_with_workers=True) + + +def _test_sending_local_online_presence_to_local_user( + test_case: HomeserverTestCase, test_with_workers: bool = False +): + """Tests that send_local_presence_to_users sends local online presence to local users. + + This simultaneously tests two different usecases: + * Testing that this method works when either called from a worker or the main process. + - We test this by calling this method from both a TestCase that runs in monolith mode, and one that + runs with a main and generic_worker. + * Testing that multiple devices syncing simultaneously will all receive a snapshot of local, + online presence - but only once per device. + + Args: + test_with_workers: If True, this method will call ModuleApi.send_local_online_presence_to on a + worker process. The test users will still sync with the main process. The purpose of testing + with a worker is to check whether a Synapse module running on a worker can inform other workers/ + the main process that they should include additional presence when a user next syncs. + """ + if test_with_workers: + # Create a worker process to make module_api calls against + worker_hs = test_case.make_worker_hs( + "synapse.app.generic_worker", {"worker_name": "presence_writer"} + ) + + # Create a user who will send presence updates + test_case.presence_receiver_id = test_case.register_user( + "presence_receiver1", "monkey" + ) + test_case.presence_receiver_tok = test_case.login("presence_receiver1", "monkey") + + # And another user that will send presence updates out + test_case.presence_sender_id = test_case.register_user("presence_sender2", "monkey") + test_case.presence_sender_tok = test_case.login("presence_sender2", "monkey") + + # Put them in a room together so they will receive each other's presence updates + room_id = test_case.helper.create_room_as( + test_case.presence_receiver_id, + tok=test_case.presence_receiver_tok, + ) + test_case.helper.join( + room_id, test_case.presence_sender_id, tok=test_case.presence_sender_tok + ) + + # Presence sender comes online + send_presence_update( + test_case, + test_case.presence_sender_id, + test_case.presence_sender_tok, + "online", + "I'm online!", + ) + + # Presence receiver should have received it + presence_updates, sync_token = sync_presence( + test_case, test_case.presence_receiver_id + ) + test_case.assertEqual(len(presence_updates), 1) + + presence_update = presence_updates[0] # type: UserPresenceState + test_case.assertEqual(presence_update.user_id, test_case.presence_sender_id) + test_case.assertEqual(presence_update.state, "online") + + if test_with_workers: + # Replicate the current sync presence token from the main process to the worker process. + # We need to do this so that the worker process knows the current presence stream ID to + # insert into the database when we call ModuleApi.send_local_online_presence_to. + test_case.replicate() + + # Syncing again should result in no presence updates + presence_updates, sync_token = sync_presence( + test_case, test_case.presence_receiver_id, sync_token + ) + test_case.assertEqual(len(presence_updates), 0) + + # We do an (initial) sync with a second "device" now, getting a new sync token. + # We'll use this in a moment. + _, sync_token_second_device = sync_presence( + test_case, test_case.presence_receiver_id + ) + + # Determine on which process (main or worker) to call ModuleApi.send_local_online_presence_to on + if test_with_workers: + module_api_to_use = worker_hs.get_module_api() + else: + module_api_to_use = test_case.module_api + + # Trigger sending local online presence. We expect this information + # to be saved to the database where all processes can access it. + # Note that we're syncing via the master. + d = module_api_to_use.send_local_online_presence_to( + [ + test_case.presence_receiver_id, + ] + ) + d = defer.ensureDeferred(d) + + if test_with_workers: + # In order for the required presence_set_state replication request to occur between the + # worker and main process, we need to pump the reactor. Otherwise, the coordinator that + # reads the request on the main process won't do so, and the request will time out. + while not d.called: + test_case.reactor.advance(0.1) + + test_case.get_success(d) + + # The presence receiver should have received online presence again. + presence_updates, sync_token = sync_presence( + test_case, test_case.presence_receiver_id, sync_token + ) + test_case.assertEqual(len(presence_updates), 1) + + presence_update = presence_updates[0] # type: UserPresenceState + test_case.assertEqual(presence_update.user_id, test_case.presence_sender_id) + test_case.assertEqual(presence_update.state, "online") + + # We attempt to sync with the second sync token we received above - just to check that + # multiple syncing devices will each receive the necessary online presence. + presence_updates, sync_token_second_device = sync_presence( + test_case, test_case.presence_receiver_id, sync_token_second_device + ) + test_case.assertEqual(len(presence_updates), 1) + + presence_update = presence_updates[0] # type: UserPresenceState + test_case.assertEqual(presence_update.user_id, test_case.presence_sender_id) + test_case.assertEqual(presence_update.state, "online") + + # However, if we now sync with either "device", we won't receive another burst of online presence + # until the API is called again sometime in the future + presence_updates, sync_token = sync_presence( + test_case, test_case.presence_receiver_id, sync_token + ) + + # Now we check that we don't receive *offline* updates using ModuleApi.send_local_online_presence_to. + + # Presence sender goes offline + send_presence_update( + test_case, + test_case.presence_sender_id, + test_case.presence_sender_tok, + "offline", + "I slink back into the darkness.", + ) + + # Presence receiver should have received the updated, offline state + presence_updates, sync_token = sync_presence( + test_case, test_case.presence_receiver_id, sync_token + ) + test_case.assertEqual(len(presence_updates), 1) + + # Now trigger sending local online presence. + d = module_api_to_use.send_local_online_presence_to( + [ + test_case.presence_receiver_id, + ] + ) + d = defer.ensureDeferred(d) + + if test_with_workers: + # In order for the required presence_set_state replication request to occur between the + # worker and main process, we need to pump the reactor. Otherwise, the coordinator that + # reads the request on the main process won't do so, and the request will time out. + while not d.called: + test_case.reactor.advance(0.1) + + test_case.get_success(d) + + # Presence receiver should *not* have received offline state + presence_updates, sync_token = sync_presence( + test_case, test_case.presence_receiver_id, sync_token + ) + test_case.assertEqual(len(presence_updates), 0) diff --git a/tests/replication/test_sharded_event_persister.py b/tests/replication/test_sharded_event_persister.py index d739eb6b17..5eca5c165d 100644 --- a/tests/replication/test_sharded_event_persister.py +++ b/tests/replication/test_sharded_event_persister.py @@ -30,7 +30,7 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase): """Checks event persisting sharding works""" # Event persister sharding requires postgres (due to needing - # `MutliWriterIdGenerator`). + # `MultiWriterIdGenerator`). if not USE_POSTGRES_FOR_TESTS: skip = "Requires Postgres" |