diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index db22fab23e..669a2af884 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import TYPE_CHECKING, Dict, List, Tuple
+from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple
from synapse.api.presence import PresenceState, UserPresenceState
from synapse.replication.tcp.streams import PresenceStream
@@ -57,6 +57,7 @@ class PresenceStore(SQLBaseStore):
db_conn, "presence_stream", "stream_id"
)
+ self.hs = hs
self._presence_on_startup = self._get_active_presence(db_conn)
presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict(
@@ -210,6 +211,61 @@ class PresenceStore(SQLBaseStore):
return {row["user_id"]: UserPresenceState(**row) for row in rows}
+ async def should_user_receive_full_presence_with_token(
+ self,
+ user_id: str,
+ from_token: int,
+ ) -> bool:
+ """Check whether the given user should receive full presence using the stream token
+ they're updating from.
+
+ Args:
+ user_id: The ID of the user to check.
+ from_token: The stream token included in their /sync token.
+
+ Returns:
+ True if the user should have full presence sent to them, False otherwise.
+ """
+
+ def _should_user_receive_full_presence_with_token_txn(txn):
+ sql = """
+ SELECT 1 FROM users_to_send_full_presence_to
+ WHERE user_id = ?
+ AND presence_stream_id >= ?
+ """
+ txn.execute(sql, (user_id, from_token))
+ return bool(txn.fetchone())
+
+ return await self.db_pool.runInteraction(
+ "should_user_receive_full_presence_with_token",
+ _should_user_receive_full_presence_with_token_txn,
+ )
+
+ async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]):
+ """Adds to the list of users who should receive a full snapshot of presence
+ upon their next sync.
+
+ Args:
+ user_ids: An iterable of user IDs.
+ """
+ # Add user entries to the table, updating the presence_stream_id column if the user already
+ # exists in the table.
+ await self.db_pool.simple_upsert_many(
+ table="users_to_send_full_presence_to",
+ key_names=("user_id",),
+ key_values=[(user_id,) for user_id in user_ids],
+ value_names=("presence_stream_id",),
+ # We save the current presence stream ID token along with the user ID entry so
+ # that when a user /sync's, even if they syncing multiple times across separate
+ # devices at different times, each device will receive full presence once - when
+ # the presence stream ID in their sync token is less than the one in the table
+ # for their user ID.
+ value_values=(
+ (self._presence_id_gen.get_current_token(),) for _ in user_ids
+ ),
+ desc="add_users_to_send_full_presence_to",
+ )
+
async def get_presence_for_all_users(
self,
include_offline: bool = True,
diff --git a/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql b/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql
new file mode 100644
index 0000000000..07b0f53ecf
--- /dev/null
+++ b/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql
@@ -0,0 +1,34 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Add a table that keeps track of a list of users who should, upon their next
+-- sync request, receive presence for all currently online users that they are
+-- "interested" in.
+
+-- The motivation for a DB table over an in-memory list is so that this list
+-- can be added to and retrieved from by any worker. Specifically, we don't
+-- want to duplicate work across multiple sync workers.
+
+CREATE TABLE IF NOT EXISTS users_to_send_full_presence_to(
+ -- The user ID to send full presence to.
+ user_id TEXT PRIMARY KEY,
+ -- A presence stream ID token - the current presence stream token when the row was last upserted.
+ -- If a user calls /sync and this token is part of the update they're to receive, we also include
+ -- full user presence in the response.
+ -- This allows multiple devices for a user to receive full presence whenever they next call /sync.
+ presence_stream_id BIGINT,
+ FOREIGN KEY (user_id)
+ REFERENCES users (name)
+);
\ No newline at end of file
|