summary refs log tree commit diff
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2021-05-05 16:02:36 +0100
committerAndrew Morgan <andrew@amorgan.xyz>2021-05-05 16:02:38 +0100
commit46eccf182697c158b0ac79045a5ebf8871de5924 (patch)
tree8bd4f813cd5039ddf6e3847789fd7869221f6f57
parentRemove users_to_send_full_presence_to table culling; add fk for user_id (diff)
downloadsynapse-46eccf182697c158b0ac79045a5ebf8871de5924.tar.xz
Add presence_stream_id column to users_to_send_full_presence_to table
We now return full presence in a /sync response based on the provided sync token,
rather than if the user has received the response already. This is necessary in
circumstances where users have more than one device - which they often do!

This comment also modifies the spot in PresenceHandler where we check if
the user should receive all presence. We only need to make the check if
from_key is not None. If from_key is None, the user will be receiving
all presence states anyways.
-rw-r--r--synapse/handlers/presence.py34
-rw-r--r--synapse/storage/databases/main/presence.py50
-rw-r--r--synapse/storage/databases/main/schema/delta/59/12users_to_send_full_presence_to.sql7
3 files changed, 43 insertions, 48 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index d1d8cfb9f4..2cc229abcc 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -1412,22 +1412,22 @@ class PresenceEventSource:
         user_id = user.to_string()
         stream_change_cache = self.store.presence_stream_cache
 
-        # Check if this user should receive all current, online user presence
-        user_in_users_to_send_full_presence_to = (
-            await self.store.is_user_in_users_to_send_full_presence_to(user_id)
-        )
-
         with Measure(self.clock, "presence.get_new_events"):
-            if user_in_users_to_send_full_presence_to:
-                # This user has been specified by a module to receive all current, online
-                # user presence. Removing from_key and setting include_offline to false
-                # will do effectively this.
-                from_key = None
-                include_offline = False
-
             if from_key is not None:
                 from_key = int(from_key)
 
+                # Check if this user should receive all current, online user presence. We only
+                # bother to do this if from_key is set, as otherwise the user will receive all
+                # user presence anyways.
+                if await self.store.should_user_receive_full_presence_with_token(
+                    user_id, from_key
+                ):
+                    # This user has been specified by a module to receive all current, online
+                    # user presence. Removing from_key and setting include_offline to false
+                    # will do effectively this.
+                    from_key = None
+                    include_offline = False
+
             max_token = self.store.get_current_presence_token()
             if from_key == max_token:
                 # This is necessary as due to the way stream ID generators work
@@ -1461,12 +1461,6 @@ class PresenceEventSource:
                     user_id, include_offline, from_key
                 )
 
-                # Remove the user from the list of users to receive all presence
-                if user_in_users_to_send_full_presence_to:
-                    await self.store.remove_user_from_users_to_send_full_presence_to(
-                        user_id
-                    )
-
                 return presence_updates, max_token
 
             # Make mypy happy. users_interested_in should now be a set
@@ -1516,10 +1510,6 @@ class PresenceEventSource:
             )
             presence_updates = list(users_to_state.values())
 
-        # Remove the user from the list of users to receive all presence
-        if user_in_users_to_send_full_presence_to:
-            await self.store.remove_user_from_users_to_send_full_presence_to(user_id)
-
         if not include_offline:
             # Filter out offline presence states
             presence_updates = self._filter_offline_presence_state(presence_updates)
diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index bd7d60be9c..3e72d10062 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -210,62 +210,62 @@ class PresenceStore(SQLBaseStore):
 
         return {row["user_id"]: UserPresenceState(**row) for row in rows}
 
-    async def is_user_in_users_to_send_full_presence_to(self, user_id: str) -> bool:
-        """Check whether the given user is one we need to send full presence to.
+    async def should_user_receive_full_presence_with_token(
+        self,
+        user_id: str,
+        from_token: int,
+    ) -> bool:
+        """Check whether the given user should receive full presence using the stream token
+        they're updating from.
 
         Args:
             user_id: The ID of the user to check.
+            from_token: The stream token included in their /sync token.
 
         Returns:
             True if the user should have full presence sent to them, False otherwise.
         """
 
-        def _is_user_in_users_to_send_full_presence_to_txn(txn):
+        def _should_user_receive_full_presence_with_token_txn(txn):
             sql = """
                 SELECT 1 FROM users_to_send_full_presence_to
                 WHERE user_id = ?
+                AND presence_stream_id >= ?
             """
-            txn.execute(sql, (user_id,))
+            txn.execute(sql, (user_id, from_token))
             return bool(txn.fetchone())
 
         return await self.db_pool.runInteraction(
-            "is_user_in_users_to_send_full_presence_to",
-            _is_user_in_users_to_send_full_presence_to_txn,
+            "should_user_receive_full_presence_with_token",
+            _should_user_receive_full_presence_with_token_txn,
         )
 
     async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]):
         """Adds to the list of users who should receive a full snapshot of presence
         upon their next sync.
 
-        Entries are kept for at least USERS_TO_SEND_FULL_PRESENCE_TO_ENTRY_LIFETIME_MS,
-        and are culled whenever this method is called.
-
         Args:
             user_ids: An iterable of user IDs.
         """
-        # Add user entries to the table
+
+        # Add user entries to the table, updating the presence_stream_id column if the user already
+        # exists in the table.
         await self.db_pool.simple_upsert_many(
             table="users_to_send_full_presence_to",
             key_names=("user_id",),
             key_values=[(user_id,) for user_id in user_ids],
-            value_names=(),
-            value_values=(),
+            value_names=("presence_stream_id",),
+            # We save the current presence stream ID token along with the user ID entry so
+            # that when a user /sync's, even if they syncing multiple times across separate
+            # devices at different times, each device will receive full presence once - when
+            # the presence stream ID in their sync token is less than the one in the table
+            # for their user ID.
+            value_values=(
+                (self._presence_id_gen.get_current_token(),) for _ in user_ids
+            ),
             desc="add_users_to_send_full_presence_to",
         )
 
-    async def remove_user_from_users_to_send_full_presence_to(self, user_id: str):
-        """Removes a user from those to send full presence to. This should only be done
-        once we have sent full presence to them following a successful sync.
-
-        Args:
-            user_id: The ID of the user to remove from the table.
-        """
-        await self.db_pool.simple_delete(
-            table="users_to_send_full_presence_to",
-            keyvalues={"user_id": user_id},
-            desc="remove_user_from_users_to_send_full_presence_to",
-        )
-
     async def get_presence_for_all_users(
         self,
         include_offline: bool = True,
diff --git a/synapse/storage/databases/main/schema/delta/59/12users_to_send_full_presence_to.sql b/synapse/storage/databases/main/schema/delta/59/12users_to_send_full_presence_to.sql
index c01c4e177a..ab37a48ed9 100644
--- a/synapse/storage/databases/main/schema/delta/59/12users_to_send_full_presence_to.sql
+++ b/synapse/storage/databases/main/schema/delta/59/12users_to_send_full_presence_to.sql
@@ -24,6 +24,11 @@
 CREATE TABLE IF NOT EXISTS users_to_send_full_presence_to(
     -- The user ID to send full presence to.
     user_id TEXT PRIMARY KEY,
+    -- A presence stream ID token - the current presence stream token when the row was last upserted.
+    -- If a user calls /sync and this token is part of the update they're to receive, we also include
+    -- full user presence in the response.
+    -- This allows multiple devices for a user to receive full presence whenever they next call /sync.
+    presence_stream_id BIGINT,
     FOREIGN KEY (user_id)
         REFERENCES users (name)
-);
\ No newline at end of file
+);