diff --git a/changelog.d/12885.misc b/changelog.d/12885.misc
new file mode 100644
index 0000000000..2524056307
--- /dev/null
+++ b/changelog.d/12885.misc
@@ -0,0 +1 @@
+Reduce database load of `/sync` when presence is enabled.
diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index b47c511450..9769a18a9d 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Tuple, cast
+from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, cast
from synapse.api.presence import PresenceState, UserPresenceState
from synapse.replication.tcp.streams import PresenceStream
@@ -22,6 +22,7 @@ from synapse.storage.database import (
LoggingDatabaseConnection,
LoggingTransaction,
)
+from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Connection
from synapse.storage.util.id_generators import (
@@ -56,7 +57,7 @@ class PresenceBackgroundUpdateStore(SQLBaseStore):
)
-class PresenceStore(PresenceBackgroundUpdateStore):
+class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore):
def __init__(
self,
database: DatabasePool,
@@ -281,20 +282,30 @@ class PresenceStore(PresenceBackgroundUpdateStore):
True if the user should have full presence sent to them, False otherwise.
"""
- def _should_user_receive_full_presence_with_token_txn(
- txn: LoggingTransaction,
- ) -> bool:
- sql = """
- SELECT 1 FROM users_to_send_full_presence_to
- WHERE user_id = ?
- AND presence_stream_id >= ?
- """
- txn.execute(sql, (user_id, from_token))
- return bool(txn.fetchone())
+ token = await self._get_full_presence_stream_token_for_user(user_id)
+ if token is None:
+ return False
- return await self.db_pool.runInteraction(
- "should_user_receive_full_presence_with_token",
- _should_user_receive_full_presence_with_token_txn,
+ return from_token <= token
+
+ @cached()
+ async def _get_full_presence_stream_token_for_user(
+ self, user_id: str
+ ) -> Optional[int]:
+ """Get the presence token corresponding to the last full presence update
+ for this user.
+
+ If the user presents a sync token with a presence stream token at least
+ as old as the result, then we need to send them a full presence update.
+
+ If this user has never needed a full presence update, returns `None`.
+ """
+ return await self.db_pool.simple_select_one_onecol(
+ table="users_to_send_full_presence_to",
+ keyvalues={"user_id": user_id},
+ retcol="presence_stream_id",
+ allow_none=True,
+ desc="_get_full_presence_stream_token_for_user",
)
async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]) -> None:
@@ -307,18 +318,28 @@ class PresenceStore(PresenceBackgroundUpdateStore):
# Add user entries to the table, updating the presence_stream_id column if the user already
# exists in the table.
presence_stream_id = self._presence_id_gen.get_current_token()
- await self.db_pool.simple_upsert_many(
- table="users_to_send_full_presence_to",
- key_names=("user_id",),
- key_values=[(user_id,) for user_id in user_ids],
- value_names=("presence_stream_id",),
- # We save the current presence stream ID token along with the user ID entry so
- # that when a user /sync's, even if they syncing multiple times across separate
- # devices at different times, each device will receive full presence once - when
- # the presence stream ID in their sync token is less than the one in the table
- # for their user ID.
- value_values=[(presence_stream_id,) for _ in user_ids],
- desc="add_users_to_send_full_presence_to",
+
+ def _add_users_to_send_full_presence_to(txn: LoggingTransaction) -> None:
+ self.db_pool.simple_upsert_many_txn(
+ txn,
+ table="users_to_send_full_presence_to",
+ key_names=("user_id",),
+ key_values=[(user_id,) for user_id in user_ids],
+ value_names=("presence_stream_id",),
+ # We save the current presence stream ID token along with the user ID entry so
+ # that when a user /sync's, even if they syncing multiple times across separate
+ # devices at different times, each device will receive full presence once - when
+ # the presence stream ID in their sync token is less than the one in the table
+ # for their user ID.
+ value_values=[(presence_stream_id,) for _ in user_ids],
+ )
+ for user_id in user_ids:
+ self._invalidate_cache_and_stream(
+ txn, self._get_full_presence_stream_token_for_user, (user_id,)
+ )
+
+ return await self.db_pool.runInteraction(
+ "add_users_to_send_full_presence_to", _add_users_to_send_full_presence_to
)
async def get_presence_for_all_users(
|