summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2022-05-31 14:01:05 +0100
committerGitHub <noreply@github.com>2022-05-31 13:01:05 +0000
commitc8684e67924fceed44bcbc4a607502764905ba1e (patch)
treec3b71b1fd4c90555d0794c881429913e5da5c512 /synapse/storage/databases
parentStop reading from `event_edges.room_id`. (#12914) (diff)
downloadsynapse-c8684e67924fceed44bcbc4a607502764905ba1e.tar.xz
Reduce DB load of /sync when using presence (#12885)
While the query was fast, we were calling it *a lot*.
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/presence.py75
1 files changed, 48 insertions, 27 deletions
diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index b47c511450..9769a18a9d 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Tuple, cast
+from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, cast
 
 from synapse.api.presence import PresenceState, UserPresenceState
 from synapse.replication.tcp.streams import PresenceStream
@@ -22,6 +22,7 @@ from synapse.storage.database import (
     LoggingDatabaseConnection,
     LoggingTransaction,
 )
+from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
 from synapse.storage.engines import PostgresEngine
 from synapse.storage.types import Connection
 from synapse.storage.util.id_generators import (
@@ -56,7 +57,7 @@ class PresenceBackgroundUpdateStore(SQLBaseStore):
         )
 
 
-class PresenceStore(PresenceBackgroundUpdateStore):
+class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore):
     def __init__(
         self,
         database: DatabasePool,
@@ -281,20 +282,30 @@ class PresenceStore(PresenceBackgroundUpdateStore):
             True if the user should have full presence sent to them, False otherwise.
         """
 
-        def _should_user_receive_full_presence_with_token_txn(
-            txn: LoggingTransaction,
-        ) -> bool:
-            sql = """
-                SELECT 1 FROM users_to_send_full_presence_to
-                WHERE user_id = ?
-                AND presence_stream_id >= ?
-            """
-            txn.execute(sql, (user_id, from_token))
-            return bool(txn.fetchone())
+        token = await self._get_full_presence_stream_token_for_user(user_id)
+        if token is None:
+            return False
 
-        return await self.db_pool.runInteraction(
-            "should_user_receive_full_presence_with_token",
-            _should_user_receive_full_presence_with_token_txn,
+        return from_token <= token
+
+    @cached()
+    async def _get_full_presence_stream_token_for_user(
+        self, user_id: str
+    ) -> Optional[int]:
+        """Get the presence token corresponding to the last full presence update
+        for this user.
+
+        If the user presents a sync token with a presence stream token at least
+        as old as the result, then we need to send them a full presence update.
+
+        If this user has never needed a full presence update, returns `None`.
+        """
+        return await self.db_pool.simple_select_one_onecol(
+            table="users_to_send_full_presence_to",
+            keyvalues={"user_id": user_id},
+            retcol="presence_stream_id",
+            allow_none=True,
+            desc="_get_full_presence_stream_token_for_user",
         )
 
     async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]) -> None:
@@ -307,18 +318,28 @@ class PresenceStore(PresenceBackgroundUpdateStore):
         # Add user entries to the table, updating the presence_stream_id column if the user already
         # exists in the table.
         presence_stream_id = self._presence_id_gen.get_current_token()
-        await self.db_pool.simple_upsert_many(
-            table="users_to_send_full_presence_to",
-            key_names=("user_id",),
-            key_values=[(user_id,) for user_id in user_ids],
-            value_names=("presence_stream_id",),
-            # We save the current presence stream ID token along with the user ID entry so
-            # that when a user /sync's, even if they syncing multiple times across separate
-            # devices at different times, each device will receive full presence once - when
-            # the presence stream ID in their sync token is less than the one in the table
-            # for their user ID.
-            value_values=[(presence_stream_id,) for _ in user_ids],
-            desc="add_users_to_send_full_presence_to",
+
+        def _add_users_to_send_full_presence_to(txn: LoggingTransaction) -> None:
+            self.db_pool.simple_upsert_many_txn(
+                txn,
+                table="users_to_send_full_presence_to",
+                key_names=("user_id",),
+                key_values=[(user_id,) for user_id in user_ids],
+                value_names=("presence_stream_id",),
+                # We save the current presence stream ID token along with the user ID entry so
+                # that when a user /sync's, even if they syncing multiple times across separate
+                # devices at different times, each device will receive full presence once - when
+                # the presence stream ID in their sync token is less than the one in the table
+                # for their user ID.
+                value_values=[(presence_stream_id,) for _ in user_ids],
+            )
+            for user_id in user_ids:
+                self._invalidate_cache_and_stream(
+                    txn, self._get_full_presence_stream_token_for_user, (user_id,)
+                )
+
+        return await self.db_pool.runInteraction(
+            "add_users_to_send_full_presence_to", _add_users_to_send_full_presence_to
         )
 
     async def get_presence_for_all_users(