diff options
author | Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> | 2021-05-18 14:13:45 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-18 14:13:45 +0100 |
commit | 4d6e5a5e995590efe44855d10dcd2a89b841dae8 (patch) | |
tree | 793277f5bfeebe433857743707620660122aa3dd /synapse | |
parent | Fix the allowed range of valid ordering characters for spaces. (#10002) (diff) | |
download | synapse-4d6e5a5e995590efe44855d10dcd2a89b841dae8.tar.xz |
Use a database table to hold the users that should have full presence sent to them, instead of something in-memory (#9823)
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/presence.py | 136 | ||||
-rw-r--r-- | synapse/module_api/__init__.py | 63 | ||||
-rw-r--r-- | synapse/replication/http/presence.py | 11 | ||||
-rw-r--r-- | synapse/rest/admin/server_notice_servlet.py | 8 | ||||
-rw-r--r-- | synapse/storage/databases/main/presence.py | 58 | ||||
-rw-r--r-- | synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql | 34 |
6 files changed, 245 insertions, 65 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 6fd1f34289..f5a049d754 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -222,9 +222,21 @@ class BasePresenceHandler(abc.ABC): @abc.abstractmethod async def set_state( - self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False + self, + target_user: UserID, + state: JsonDict, + ignore_status_msg: bool = False, + force_notify: bool = False, ) -> None: - """Set the presence state of the user. """ + """Set the presence state of the user. + + Args: + target_user: The ID of the user to set the presence state of. + state: The presence state as a JSON dictionary. + ignore_status_msg: True to ignore the "status_msg" field of the `state` dict. + If False, the user's current status will be updated. + force_notify: Whether to force notification of the update to clients. + """ @abc.abstractmethod async def bump_presence_active_time(self, user: UserID): @@ -296,6 +308,51 @@ class BasePresenceHandler(abc.ABC): for destinations, states in hosts_and_states: self._federation.send_presence_to_destinations(states, destinations) + async def send_full_presence_to_users(self, user_ids: Collection[str]): + """ + Adds to the list of users who should receive a full snapshot of presence + upon their next sync. Note that this only works for local users. + + Then, grabs the current presence state for a given set of users and adds it + to the top of the presence stream. + + Args: + user_ids: The IDs of the local users to send full presence to. + """ + # Retrieve one of the users from the given set + if not user_ids: + raise Exception( + "send_full_presence_to_users must be called with at least one user" + ) + user_id = next(iter(user_ids)) + + # Mark all users as receiving full presence on their next sync + await self.store.add_users_to_send_full_presence_to(user_ids) + + # Add a new entry to the presence stream. Since we use stream tokens to determine whether a + # local user should receive a full snapshot of presence when they sync, we need to bump the + # presence stream so that subsequent syncs with no presence activity in between won't result + # in the client receiving multiple full snapshots of presence. + # + # If we bump the stream ID, then the user will get a higher stream token next sync, and thus + # correctly won't receive a second snapshot. + + # Get the current presence state for one of the users (defaults to offline if not found) + current_presence_state = await self.get_state(UserID.from_string(user_id)) + + # Convert the UserPresenceState object into a serializable dict + state = { + "presence": current_presence_state.state, + "status_message": current_presence_state.status_msg, + } + + # Copy the presence state to the tip of the presence stream. + + # We set force_notify=True here so that this presence update is guaranteed to + # increment the presence stream ID (which resending the current user's presence + # otherwise would not do). + await self.set_state(UserID.from_string(user_id), state, force_notify=True) + class _NullContextManager(ContextManager[None]): """A context manager which does nothing.""" @@ -480,8 +537,17 @@ class WorkerPresenceHandler(BasePresenceHandler): target_user: UserID, state: JsonDict, ignore_status_msg: bool = False, + force_notify: bool = False, ) -> None: - """Set the presence state of the user.""" + """Set the presence state of the user. + + Args: + target_user: The ID of the user to set the presence state of. + state: The presence state as a JSON dictionary. + ignore_status_msg: True to ignore the "status_msg" field of the `state` dict. + If False, the user's current status will be updated. + force_notify: Whether to force notification of the update to clients. + """ presence = state["presence"] valid_presence = ( @@ -508,6 +574,7 @@ class WorkerPresenceHandler(BasePresenceHandler): user_id=user_id, state=state, ignore_status_msg=ignore_status_msg, + force_notify=force_notify, ) async def bump_presence_active_time(self, user: UserID) -> None: @@ -677,13 +744,19 @@ class PresenceHandler(BasePresenceHandler): [self.user_to_current_state[user_id] for user_id in unpersisted] ) - async def _update_states(self, new_states: Iterable[UserPresenceState]) -> None: + async def _update_states( + self, new_states: Iterable[UserPresenceState], force_notify: bool = False + ) -> None: """Updates presence of users. Sets the appropriate timeouts. Pokes the notifier and federation if and only if the changed presence state should be sent to clients/servers. Args: new_states: The new user presence state updates to process. + force_notify: Whether to force notifying clients of this presence state update, + even if it doesn't change the state of a user's presence (e.g online -> online). + This is currently used to bump the max presence stream ID without changing any + user's presence (see PresenceHandler.add_users_to_send_full_presence_to). """ now = self.clock.time_msec() @@ -720,6 +793,9 @@ class PresenceHandler(BasePresenceHandler): now=now, ) + if force_notify: + should_notify = True + self.user_to_current_state[user_id] = new_state if should_notify: @@ -1058,9 +1134,21 @@ class PresenceHandler(BasePresenceHandler): await self._update_states(updates) async def set_state( - self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False + self, + target_user: UserID, + state: JsonDict, + ignore_status_msg: bool = False, + force_notify: bool = False, ) -> None: - """Set the presence state of the user.""" + """Set the presence state of the user. + + Args: + target_user: The ID of the user to set the presence state of. + state: The presence state as a JSON dictionary. + ignore_status_msg: True to ignore the "status_msg" field of the `state` dict. + If False, the user's current status will be updated. + force_notify: Whether to force notification of the update to clients. + """ status_msg = state.get("status_msg", None) presence = state["presence"] @@ -1091,7 +1179,9 @@ class PresenceHandler(BasePresenceHandler): ): new_fields["last_active_ts"] = self.clock.time_msec() - await self._update_states([prev_state.copy_and_replace(**new_fields)]) + await self._update_states( + [prev_state.copy_and_replace(**new_fields)], force_notify=force_notify + ) async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool: """Returns whether a user can see another user's presence.""" @@ -1389,11 +1479,10 @@ class PresenceEventSource: # # Presence -> Notifier -> PresenceEventSource -> Presence # - # Same with get_module_api, get_presence_router + # Same with get_presence_router: # # AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler self.get_presence_handler = hs.get_presence_handler - self.get_module_api = hs.get_module_api self.get_presence_router = hs.get_presence_router self.clock = hs.get_clock() self.store = hs.get_datastore() @@ -1424,16 +1513,21 @@ class PresenceEventSource: stream_change_cache = self.store.presence_stream_cache with Measure(self.clock, "presence.get_new_events"): - if user_id in self.get_module_api()._send_full_presence_to_local_users: - # This user has been specified by a module to receive all current, online - # user presence. Removing from_key and setting include_offline to false - # will do effectively this. - from_key = None - include_offline = False - if from_key is not None: from_key = int(from_key) + # Check if this user should receive all current, online user presence. We only + # bother to do this if from_key is set, as otherwise the user will receive all + # user presence anyways. + if await self.store.should_user_receive_full_presence_with_token( + user_id, from_key + ): + # This user has been specified by a module to receive all current, online + # user presence. Removing from_key and setting include_offline to false + # will do effectively this. + from_key = None + include_offline = False + max_token = self.store.get_current_presence_token() if from_key == max_token: # This is necessary as due to the way stream ID generators work @@ -1467,12 +1561,6 @@ class PresenceEventSource: user_id, include_offline, from_key ) - # Remove the user from the list of users to receive all presence - if user_id in self.get_module_api()._send_full_presence_to_local_users: - self.get_module_api()._send_full_presence_to_local_users.remove( - user_id - ) - return presence_updates, max_token # Make mypy happy. users_interested_in should now be a set @@ -1522,10 +1610,6 @@ class PresenceEventSource: ) presence_updates = list(users_to_state.values()) - # Remove the user from the list of users to receive all presence - if user_id in self.get_module_api()._send_full_presence_to_local_users: - self.get_module_api()._send_full_presence_to_local_users.remove(user_id) - if not include_offline: # Filter out offline presence states presence_updates = self._filter_offline_presence_state(presence_updates) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index a1a2b9aecc..cecdc96bf5 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -56,14 +56,6 @@ class ModuleApi: self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient self._public_room_list_manager = PublicRoomListManager(hs) - # The next time these users sync, they will receive the current presence - # state of all local users. Users are added by send_local_online_presence_to, - # and removed after a successful sync. - # - # We make this a private variable to deter modules from accessing it directly, - # though other classes in Synapse will still do so. - self._send_full_presence_to_local_users = set() - @property def http_client(self): """Allows making outbound HTTP requests to remote resources. @@ -405,39 +397,44 @@ class ModuleApi: Updates to remote users will be sent immediately, whereas local users will receive them on their next sync attempt. - Note that this method can only be run on the main or federation_sender worker - processes. + Note that this method can only be run on the process that is configured to write to the + presence stream. By default this is the main process. """ - if not self._hs.should_send_federation(): + if self._hs._instance_name not in self._hs.config.worker.writers.presence: raise Exception( "send_local_online_presence_to can only be run " - "on processes that send federation", + "on the process that is configured to write to the " + "presence stream (by default this is the main process)", ) + local_users = set() + remote_users = set() for user in users: if self._hs.is_mine_id(user): - # Modify SyncHandler._generate_sync_entry_for_presence to call - # presence_source.get_new_events with an empty `from_key` if - # that user's ID were in a list modified by ModuleApi somewhere. - # That user would then get all presence state on next incremental sync. - - # Force a presence initial_sync for this user next time - self._send_full_presence_to_local_users.add(user) + local_users.add(user) else: - # Retrieve presence state for currently online users that this user - # is considered interested in - presence_events, _ = await self._presence_stream.get_new_events( - UserID.from_string(user), from_key=None, include_offline=False - ) - - # Send to remote destinations. - - # We pull out the presence handler here to break a cyclic - # dependency between the presence router and module API. - presence_handler = self._hs.get_presence_handler() - await presence_handler.maybe_send_presence_to_interested_destinations( - presence_events - ) + remote_users.add(user) + + # We pull out the presence handler here to break a cyclic + # dependency between the presence router and module API. + presence_handler = self._hs.get_presence_handler() + + if local_users: + # Force a presence initial_sync for these users next time they sync. + await presence_handler.send_full_presence_to_users(local_users) + + for user in remote_users: + # Retrieve presence state for currently online users that this user + # is considered interested in. + presence_events, _ = await self._presence_stream.get_new_events( + UserID.from_string(user), from_key=None, include_offline=False + ) + + # Send to remote destinations. + destination = UserID.from_string(user).domain + presence_handler.get_federation_queue().send_presence_to_destinations( + presence_events, destination + ) class PublicRoomListManager: diff --git a/synapse/replication/http/presence.py b/synapse/replication/http/presence.py index f25307620d..bb00247953 100644 --- a/synapse/replication/http/presence.py +++ b/synapse/replication/http/presence.py @@ -73,6 +73,7 @@ class ReplicationPresenceSetState(ReplicationEndpoint): { "state": { ... }, "ignore_status_msg": false, + "force_notify": false } 200 OK @@ -91,17 +92,23 @@ class ReplicationPresenceSetState(ReplicationEndpoint): self._presence_handler = hs.get_presence_handler() @staticmethod - async def _serialize_payload(user_id, state, ignore_status_msg=False): + async def _serialize_payload( + user_id, state, ignore_status_msg=False, force_notify=False + ): return { "state": state, "ignore_status_msg": ignore_status_msg, + "force_notify": force_notify, } async def _handle_request(self, request, user_id): content = parse_json_object_from_request(request) await self._presence_handler.set_state( - UserID.from_string(user_id), content["state"], content["ignore_status_msg"] + UserID.from_string(user_id), + content["state"], + content["ignore_status_msg"], + content["force_notify"], ) return ( diff --git a/synapse/rest/admin/server_notice_servlet.py b/synapse/rest/admin/server_notice_servlet.py index cc3ab5854b..b5e4c474ef 100644 --- a/synapse/rest/admin/server_notice_servlet.py +++ b/synapse/rest/admin/server_notice_servlet.py @@ -54,7 +54,6 @@ class SendServerNoticeServlet(RestServlet): self.hs = hs self.auth = hs.get_auth() self.txns = HttpTransactionCache(hs) - self.snm = hs.get_server_notices_manager() def register(self, json_resource: HttpServer): PATTERN = "/send_server_notice" @@ -77,7 +76,10 @@ class SendServerNoticeServlet(RestServlet): event_type = body.get("type", EventTypes.Message) state_key = body.get("state_key") - if not self.snm.is_enabled(): + # We grab the server notices manager here as its initialisation has a check for worker processes, + # but worker processes still need to initialise SendServerNoticeServlet (as it is part of the + # admin api). + if not self.hs.get_server_notices_manager().is_enabled(): raise SynapseError(400, "Server notices are not enabled on this server") user_id = body["user_id"] @@ -85,7 +87,7 @@ class SendServerNoticeServlet(RestServlet): if not self.hs.is_mine_id(user_id): raise SynapseError(400, "Server notices can only be sent to local users") - event = await self.snm.send_notice( + event = await self.hs.get_server_notices_manager().send_notice( user_id=body["user_id"], type=event_type, state_key=state_key, diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index db22fab23e..669a2af884 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, Dict, List, Tuple +from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple from synapse.api.presence import PresenceState, UserPresenceState from synapse.replication.tcp.streams import PresenceStream @@ -57,6 +57,7 @@ class PresenceStore(SQLBaseStore): db_conn, "presence_stream", "stream_id" ) + self.hs = hs self._presence_on_startup = self._get_active_presence(db_conn) presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict( @@ -210,6 +211,61 @@ class PresenceStore(SQLBaseStore): return {row["user_id"]: UserPresenceState(**row) for row in rows} + async def should_user_receive_full_presence_with_token( + self, + user_id: str, + from_token: int, + ) -> bool: + """Check whether the given user should receive full presence using the stream token + they're updating from. + + Args: + user_id: The ID of the user to check. + from_token: The stream token included in their /sync token. + + Returns: + True if the user should have full presence sent to them, False otherwise. + """ + + def _should_user_receive_full_presence_with_token_txn(txn): + sql = """ + SELECT 1 FROM users_to_send_full_presence_to + WHERE user_id = ? + AND presence_stream_id >= ? + """ + txn.execute(sql, (user_id, from_token)) + return bool(txn.fetchone()) + + return await self.db_pool.runInteraction( + "should_user_receive_full_presence_with_token", + _should_user_receive_full_presence_with_token_txn, + ) + + async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]): + """Adds to the list of users who should receive a full snapshot of presence + upon their next sync. + + Args: + user_ids: An iterable of user IDs. + """ + # Add user entries to the table, updating the presence_stream_id column if the user already + # exists in the table. + await self.db_pool.simple_upsert_many( + table="users_to_send_full_presence_to", + key_names=("user_id",), + key_values=[(user_id,) for user_id in user_ids], + value_names=("presence_stream_id",), + # We save the current presence stream ID token along with the user ID entry so + # that when a user /sync's, even if they syncing multiple times across separate + # devices at different times, each device will receive full presence once - when + # the presence stream ID in their sync token is less than the one in the table + # for their user ID. + value_values=( + (self._presence_id_gen.get_current_token(),) for _ in user_ids + ), + desc="add_users_to_send_full_presence_to", + ) + async def get_presence_for_all_users( self, include_offline: bool = True, diff --git a/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql b/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql new file mode 100644 index 0000000000..07b0f53ecf --- /dev/null +++ b/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql @@ -0,0 +1,34 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Add a table that keeps track of a list of users who should, upon their next +-- sync request, receive presence for all currently online users that they are +-- "interested" in. + +-- The motivation for a DB table over an in-memory list is so that this list +-- can be added to and retrieved from by any worker. Specifically, we don't +-- want to duplicate work across multiple sync workers. + +CREATE TABLE IF NOT EXISTS users_to_send_full_presence_to( + -- The user ID to send full presence to. + user_id TEXT PRIMARY KEY, + -- A presence stream ID token - the current presence stream token when the row was last upserted. + -- If a user calls /sync and this token is part of the update they're to receive, we also include + -- full user presence in the response. + -- This allows multiple devices for a user to receive full presence whenever they next call /sync. + presence_stream_id BIGINT, + FOREIGN KEY (user_id) + REFERENCES users (name) +); \ No newline at end of file |