From 4d6e5a5e995590efe44855d10dcd2a89b841dae8 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 18 May 2021 14:13:45 +0100 Subject: Use a database table to hold the users that should have full presence sent to them, instead of something in-memory (#9823) --- synapse/storage/databases/main/presence.py | 58 +++++++++++++++++++++++++++++- 1 file changed, 57 insertions(+), 1 deletion(-) (limited to 'synapse/storage/databases/main/presence.py') diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index db22fab23e..669a2af884 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, Dict, List, Tuple +from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple from synapse.api.presence import PresenceState, UserPresenceState from synapse.replication.tcp.streams import PresenceStream @@ -57,6 +57,7 @@ class PresenceStore(SQLBaseStore): db_conn, "presence_stream", "stream_id" ) + self.hs = hs self._presence_on_startup = self._get_active_presence(db_conn) presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict( @@ -210,6 +211,61 @@ class PresenceStore(SQLBaseStore): return {row["user_id"]: UserPresenceState(**row) for row in rows} + async def should_user_receive_full_presence_with_token( + self, + user_id: str, + from_token: int, + ) -> bool: + """Check whether the given user should receive full presence using the stream token + they're updating from. + + Args: + user_id: The ID of the user to check. + from_token: The stream token included in their /sync token. + + Returns: + True if the user should have full presence sent to them, False otherwise. + """ + + def _should_user_receive_full_presence_with_token_txn(txn): + sql = """ + SELECT 1 FROM users_to_send_full_presence_to + WHERE user_id = ? + AND presence_stream_id >= ? + """ + txn.execute(sql, (user_id, from_token)) + return bool(txn.fetchone()) + + return await self.db_pool.runInteraction( + "should_user_receive_full_presence_with_token", + _should_user_receive_full_presence_with_token_txn, + ) + + async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]): + """Adds to the list of users who should receive a full snapshot of presence + upon their next sync. + + Args: + user_ids: An iterable of user IDs. + """ + # Add user entries to the table, updating the presence_stream_id column if the user already + # exists in the table. + await self.db_pool.simple_upsert_many( + table="users_to_send_full_presence_to", + key_names=("user_id",), + key_values=[(user_id,) for user_id in user_ids], + value_names=("presence_stream_id",), + # We save the current presence stream ID token along with the user ID entry so + # that when a user /sync's, even if they syncing multiple times across separate + # devices at different times, each device will receive full presence once - when + # the presence stream ID in their sync token is less than the one in the table + # for their user ID. + value_values=( + (self._presence_id_gen.get_current_token(),) for _ in user_ids + ), + desc="add_users_to_send_full_presence_to", + ) + async def get_presence_for_all_users( self, include_offline: bool = True, -- cgit 1.5.1 From 6a8643ff3da905568e3f2ec047182753352e39d1 Mon Sep 17 00:00:00 2001 From: Marek Matys <57749215+thermaq@users.noreply.github.com> Date: Fri, 21 May 2021 13:02:06 +0200 Subject: Fixed removal of new presence stream states (#10014) Fixes: https://github.com/matrix-org/synapse/issues/9962 This is a fix for above problem. I fixed it by swaping the order of insertion of new records and deletion of old ones. This ensures that we don't delete fresh database records as we do deletes before inserts. Signed-off-by: Marek Matys --- changelog.d/10014.bugfix | 1 + synapse/storage/databases/main/presence.py | 18 +++++++++--------- 2 files changed, 10 insertions(+), 9 deletions(-) create mode 100644 changelog.d/10014.bugfix (limited to 'synapse/storage/databases/main/presence.py') diff --git a/changelog.d/10014.bugfix b/changelog.d/10014.bugfix new file mode 100644 index 0000000000..7cf3603f94 --- /dev/null +++ b/changelog.d/10014.bugfix @@ -0,0 +1 @@ +Fixed deletion of new presence stream states from database. diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 669a2af884..6a2baa7841 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -97,6 +97,15 @@ class PresenceStore(SQLBaseStore): ) txn.call_after(self._get_presence_for_user.invalidate, (state.user_id,)) + # Delete old rows to stop database from getting really big + sql = "DELETE FROM presence_stream WHERE stream_id < ? AND " + + for states in batch_iter(presence_states, 50): + clause, args = make_in_list_sql_clause( + self.database_engine, "user_id", [s.user_id for s in states] + ) + txn.execute(sql + clause, [stream_id] + list(args)) + # Actually insert new rows self.db_pool.simple_insert_many_txn( txn, @@ -117,15 +126,6 @@ class PresenceStore(SQLBaseStore): ], ) - # Delete old rows to stop database from getting really big - sql = "DELETE FROM presence_stream WHERE stream_id < ? AND " - - for states in batch_iter(presence_states, 50): - clause, args = make_in_list_sql_clause( - self.database_engine, "user_id", [s.user_id for s in states] - ) - txn.execute(sql + clause, [stream_id] + list(args)) - async def get_all_presence_updates( self, instance_name: str, last_id: int, current_id: int, limit: int ) -> Tuple[List[Tuple[int, list]], int, bool]: -- cgit 1.5.1