summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/__init__.py47
-rw-r--r--synapse/storage/databases/main/presence.py92
-rw-r--r--synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql18
-rw-r--r--synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres20
4 files changed, 129 insertions, 48 deletions
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 5c50f5f950..49c7606d51 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -17,7 +17,6 @@
 import logging
 from typing import List, Optional, Tuple
 
-from synapse.api.constants import PresenceState
 from synapse.config.homeserver import HomeServerConfig
 from synapse.storage.database import DatabasePool
 from synapse.storage.databases.main.stats import UserSortOrder
@@ -51,7 +50,7 @@ from .media_repository import MediaRepositoryStore
 from .metrics import ServerMetricsStore
 from .monthly_active_users import MonthlyActiveUsersStore
 from .openid import OpenIdStore
-from .presence import PresenceStore, UserPresenceState
+from .presence import PresenceStore
 from .profile import ProfileStore
 from .purge_events import PurgeEventsStore
 from .push_rule import PushRuleStore
@@ -126,9 +125,6 @@ class DataStore(
         self._clock = hs.get_clock()
         self.database_engine = database.engine
 
-        self._presence_id_gen = StreamIdGenerator(
-            db_conn, "presence_stream", "stream_id"
-        )
         self._public_room_id_gen = StreamIdGenerator(
             db_conn, "public_room_list_stream", "stream_id"
         )
@@ -177,21 +173,6 @@ class DataStore(
 
         super().__init__(database, db_conn, hs)
 
-        self._presence_on_startup = self._get_active_presence(db_conn)
-
-        presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict(
-            db_conn,
-            "presence_stream",
-            entity_column="user_id",
-            stream_column="stream_id",
-            max_value=self._presence_id_gen.get_current_token(),
-        )
-        self.presence_stream_cache = StreamChangeCache(
-            "PresenceStreamChangeCache",
-            min_presence_val,
-            prefilled_cache=presence_cache_prefill,
-        )
-
         device_list_max = self._device_list_id_gen.get_current_token()
         self._device_list_stream_cache = StreamChangeCache(
             "DeviceListStreamChangeCache", device_list_max
@@ -238,32 +219,6 @@ class DataStore(
     def get_device_stream_token(self) -> int:
         return self._device_list_id_gen.get_current_token()
 
-    def take_presence_startup_info(self):
-        active_on_startup = self._presence_on_startup
-        self._presence_on_startup = None
-        return active_on_startup
-
-    def _get_active_presence(self, db_conn):
-        """Fetch non-offline presence from the database so that we can register
-        the appropriate time outs.
-        """
-
-        sql = (
-            "SELECT user_id, state, last_active_ts, last_federation_update_ts,"
-            " last_user_sync_ts, status_msg, currently_active FROM presence_stream"
-            " WHERE state != ?"
-        )
-
-        txn = db_conn.cursor()
-        txn.execute(sql, (PresenceState.OFFLINE,))
-        rows = self.db_pool.cursor_to_dict(txn)
-        txn.close()
-
-        for row in rows:
-            row["currently_active"] = bool(row["currently_active"])
-
-        return [UserPresenceState(**row) for row in rows]
-
     async def get_users(self) -> List[JsonDict]:
         """Function to retrieve a list of users in users table.
 
diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index c207d917b1..db22fab23e 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -12,16 +12,69 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from typing import Dict, List, Tuple
+from typing import TYPE_CHECKING, Dict, List, Tuple
 
-from synapse.api.presence import UserPresenceState
+from synapse.api.presence import PresenceState, UserPresenceState
+from synapse.replication.tcp.streams import PresenceStream
 from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
+from synapse.storage.database import DatabasePool
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.types import Connection
+from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
 from synapse.util.caches.descriptors import cached, cachedList
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.iterutils import batch_iter
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 
 class PresenceStore(SQLBaseStore):
+    def __init__(
+        self,
+        database: DatabasePool,
+        db_conn: Connection,
+        hs: "HomeServer",
+    ):
+        super().__init__(database, db_conn, hs)
+
+        self._can_persist_presence = (
+            hs.get_instance_name() in hs.config.worker.writers.presence
+        )
+
+        if isinstance(database.engine, PostgresEngine):
+            self._presence_id_gen = MultiWriterIdGenerator(
+                db_conn=db_conn,
+                db=database,
+                stream_name="presence_stream",
+                instance_name=self._instance_name,
+                tables=[("presence_stream", "instance_name", "stream_id")],
+                sequence_name="presence_stream_sequence",
+                writers=hs.config.worker.writers.to_device,
+            )
+        else:
+            self._presence_id_gen = StreamIdGenerator(
+                db_conn, "presence_stream", "stream_id"
+            )
+
+        self._presence_on_startup = self._get_active_presence(db_conn)
+
+        presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict(
+            db_conn,
+            "presence_stream",
+            entity_column="user_id",
+            stream_column="stream_id",
+            max_value=self._presence_id_gen.get_current_token(),
+        )
+        self.presence_stream_cache = StreamChangeCache(
+            "PresenceStreamChangeCache",
+            min_presence_val,
+            prefilled_cache=presence_cache_prefill,
+        )
+
     async def update_presence(self, presence_states):
+        assert self._can_persist_presence
+
         stream_ordering_manager = self._presence_id_gen.get_next_mult(
             len(presence_states)
         )
@@ -57,6 +110,7 @@ class PresenceStore(SQLBaseStore):
                     "last_user_sync_ts": state.last_user_sync_ts,
                     "status_msg": state.status_msg,
                     "currently_active": state.currently_active,
+                    "instance_name": self._instance_name,
                 }
                 for stream_id, state in zip(stream_orderings, presence_states)
             ],
@@ -216,3 +270,37 @@ class PresenceStore(SQLBaseStore):
 
     def get_current_presence_token(self):
         return self._presence_id_gen.get_current_token()
+
+    def _get_active_presence(self, db_conn: Connection):
+        """Fetch non-offline presence from the database so that we can register
+        the appropriate time outs.
+        """
+
+        sql = (
+            "SELECT user_id, state, last_active_ts, last_federation_update_ts,"
+            " last_user_sync_ts, status_msg, currently_active FROM presence_stream"
+            " WHERE state != ?"
+        )
+
+        txn = db_conn.cursor()
+        txn.execute(sql, (PresenceState.OFFLINE,))
+        rows = self.db_pool.cursor_to_dict(txn)
+        txn.close()
+
+        for row in rows:
+            row["currently_active"] = bool(row["currently_active"])
+
+        return [UserPresenceState(**row) for row in rows]
+
+    def take_presence_startup_info(self):
+        active_on_startup = self._presence_on_startup
+        self._presence_on_startup = None
+        return active_on_startup
+
+    def process_replication_rows(self, stream_name, instance_name, token, rows):
+        if stream_name == PresenceStream.NAME:
+            self._presence_id_gen.advance(instance_name, token)
+            for row in rows:
+                self.presence_stream_cache.entity_has_changed(row.user_id, token)
+                self._get_presence_for_user.invalidate((row.user_id,))
+        return super().process_replication_rows(stream_name, instance_name, token, rows)
diff --git a/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql
new file mode 100644
index 0000000000..b6ba0bda1a
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql
@@ -0,0 +1,18 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Add a column to specify which instance wrote the row. Historic rows have
+-- `NULL`, which indicates that the master instance wrote them.
+ALTER TABLE presence_stream ADD COLUMN instance_name TEXT;
diff --git a/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres
new file mode 100644
index 0000000000..02b182adf9
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres
@@ -0,0 +1,20 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE SEQUENCE IF NOT EXISTS presence_stream_sequence;
+
+SELECT setval('presence_stream_sequence', (
+    SELECT COALESCE(MAX(stream_id), 1) FROM presence_stream
+));