summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/presence.py58
-rw-r--r--synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql34
2 files changed, 91 insertions, 1 deletions
diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index db22fab23e..669a2af884 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from typing import TYPE_CHECKING, Dict, List, Tuple
+from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple
 
 from synapse.api.presence import PresenceState, UserPresenceState
 from synapse.replication.tcp.streams import PresenceStream
@@ -57,6 +57,7 @@ class PresenceStore(SQLBaseStore):
                 db_conn, "presence_stream", "stream_id"
             )
 
+        self.hs = hs
         self._presence_on_startup = self._get_active_presence(db_conn)
 
         presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict(
@@ -210,6 +211,61 @@ class PresenceStore(SQLBaseStore):
 
         return {row["user_id"]: UserPresenceState(**row) for row in rows}
 
+    async def should_user_receive_full_presence_with_token(
+        self,
+        user_id: str,
+        from_token: int,
+    ) -> bool:
+        """Check whether the given user should receive full presence using the stream token
+        they're updating from.
+
+        Args:
+            user_id: The ID of the user to check.
+            from_token: The stream token included in their /sync token.
+
+        Returns:
+            True if the user should have full presence sent to them, False otherwise.
+        """
+
+        def _should_user_receive_full_presence_with_token_txn(txn):
+            sql = """
+                SELECT 1 FROM users_to_send_full_presence_to
+                WHERE user_id = ?
+                AND presence_stream_id >= ?
+            """
+            txn.execute(sql, (user_id, from_token))
+            return bool(txn.fetchone())
+
+        return await self.db_pool.runInteraction(
+            "should_user_receive_full_presence_with_token",
+            _should_user_receive_full_presence_with_token_txn,
+        )
+
+    async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]):
+        """Adds to the list of users who should receive a full snapshot of presence
+        upon their next sync.
+
+        Args:
+            user_ids: An iterable of user IDs.
+        """
+        # Add user entries to the table, updating the presence_stream_id column if the user already
+        # exists in the table.
+        await self.db_pool.simple_upsert_many(
+            table="users_to_send_full_presence_to",
+            key_names=("user_id",),
+            key_values=[(user_id,) for user_id in user_ids],
+            value_names=("presence_stream_id",),
+            # We save the current presence stream ID token along with the user ID entry so
+            # that when a user /sync's, even if they syncing multiple times across separate
+            # devices at different times, each device will receive full presence once - when
+            # the presence stream ID in their sync token is less than the one in the table
+            # for their user ID.
+            value_values=(
+                (self._presence_id_gen.get_current_token(),) for _ in user_ids
+            ),
+            desc="add_users_to_send_full_presence_to",
+        )
+
     async def get_presence_for_all_users(
         self,
         include_offline: bool = True,
diff --git a/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql b/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql
new file mode 100644
index 0000000000..07b0f53ecf
--- /dev/null
+++ b/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql
@@ -0,0 +1,34 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Add a table that keeps track of a list of users who should, upon their next
+-- sync request, receive presence for all currently online users that they are
+-- "interested" in.
+
+-- The motivation for a DB table over an in-memory list is so that this list
+-- can be added to and retrieved from by any worker. Specifically, we don't
+-- want to duplicate work across multiple sync workers.
+
+CREATE TABLE IF NOT EXISTS users_to_send_full_presence_to(
+    -- The user ID to send full presence to.
+    user_id TEXT PRIMARY KEY,
+    -- A presence stream ID token - the current presence stream token when the row was last upserted.
+    -- If a user calls /sync and this token is part of the update they're to receive, we also include
+    -- full user presence in the response.
+    -- This allows multiple devices for a user to receive full presence whenever they next call /sync.
+    presence_stream_id BIGINT,
+    FOREIGN KEY (user_id)
+        REFERENCES users (name)
+);
\ No newline at end of file