summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/__init__.py33
-rw-r--r--synapse/storage/databases/main/deviceinbox.py252
-rw-r--r--synapse/storage/databases/main/schema/delta/59/02shard_send_to_device.sql18
-rw-r--r--synapse/storage/databases/main/schema/delta/59/03shard_send_to_device_sequence.sql.postgres25
4 files changed, 215 insertions, 113 deletions
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 701748f93b..c4de07a0a8 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -127,9 +127,6 @@ class DataStore(
         self._presence_id_gen = StreamIdGenerator(
             db_conn, "presence_stream", "stream_id"
         )
-        self._device_inbox_id_gen = StreamIdGenerator(
-            db_conn, "device_inbox", "stream_id"
-        )
         self._public_room_id_gen = StreamIdGenerator(
             db_conn, "public_room_list_stream", "stream_id"
         )
@@ -189,36 +186,6 @@ class DataStore(
             prefilled_cache=presence_cache_prefill,
         )
 
-        max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
-        device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict(
-            db_conn,
-            "device_inbox",
-            entity_column="user_id",
-            stream_column="stream_id",
-            max_value=max_device_inbox_id,
-            limit=1000,
-        )
-        self._device_inbox_stream_cache = StreamChangeCache(
-            "DeviceInboxStreamChangeCache",
-            min_device_inbox_id,
-            prefilled_cache=device_inbox_prefill,
-        )
-        # The federation outbox and the local device inbox uses the same
-        # stream_id generator.
-        device_outbox_prefill, min_device_outbox_id = self.db_pool.get_cache_dict(
-            db_conn,
-            "device_federation_outbox",
-            entity_column="destination",
-            stream_column="stream_id",
-            max_value=max_device_inbox_id,
-            limit=1000,
-        )
-        self._device_federation_outbox_stream_cache = StreamChangeCache(
-            "DeviceFederationOutboxStreamChangeCache",
-            min_device_outbox_id,
-            prefilled_cache=device_outbox_prefill,
-        )
-
         device_list_max = self._device_list_id_gen.get_current_token()
         self._device_list_stream_cache = StreamChangeCache(
             "DeviceListStreamChangeCache", device_list_max
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index d42faa3f1f..58d3f71e45 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -17,15 +17,100 @@ import logging
 from typing import List, Tuple
 
 from synapse.logging.opentracing import log_kv, set_tag, trace
-from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
+from synapse.replication.tcp.streams import ToDeviceStream
+from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import DatabasePool
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
 from synapse.util import json_encoder
 from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 logger = logging.getLogger(__name__)
 
 
 class DeviceInboxWorkerStore(SQLBaseStore):
+    def __init__(self, database: DatabasePool, db_conn, hs):
+        super().__init__(database, db_conn, hs)
+
+        self._instance_name = hs.get_instance_name()
+
+        # Map of (user_id, device_id) to the last stream_id that has been
+        # deleted up to. This is so that we can no op deletions.
+        self._last_device_delete_cache = ExpiringCache(
+            cache_name="last_device_delete_cache",
+            clock=self._clock,
+            max_len=10000,
+            expiry_ms=30 * 60 * 1000,
+        )
+
+        if isinstance(database.engine, PostgresEngine):
+            self._can_write_to_device = (
+                self._instance_name in hs.config.worker.writers.to_device
+            )
+
+            self._device_inbox_id_gen = MultiWriterIdGenerator(
+                db_conn=db_conn,
+                db=database,
+                stream_name="to_device",
+                instance_name=self._instance_name,
+                table="device_inbox",
+                instance_column="instance_name",
+                id_column="stream_id",
+                sequence_name="device_inbox_sequence",
+                writers=hs.config.worker.writers.to_device,
+            )
+        else:
+            self._can_write_to_device = True
+            self._device_inbox_id_gen = StreamIdGenerator(
+                db_conn, "device_inbox", "stream_id"
+            )
+
+        max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
+        device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict(
+            db_conn,
+            "device_inbox",
+            entity_column="user_id",
+            stream_column="stream_id",
+            max_value=max_device_inbox_id,
+            limit=1000,
+        )
+        self._device_inbox_stream_cache = StreamChangeCache(
+            "DeviceInboxStreamChangeCache",
+            min_device_inbox_id,
+            prefilled_cache=device_inbox_prefill,
+        )
+
+        # The federation outbox and the local device inbox uses the same
+        # stream_id generator.
+        device_outbox_prefill, min_device_outbox_id = self.db_pool.get_cache_dict(
+            db_conn,
+            "device_federation_outbox",
+            entity_column="destination",
+            stream_column="stream_id",
+            max_value=max_device_inbox_id,
+            limit=1000,
+        )
+        self._device_federation_outbox_stream_cache = StreamChangeCache(
+            "DeviceFederationOutboxStreamChangeCache",
+            min_device_outbox_id,
+            prefilled_cache=device_outbox_prefill,
+        )
+
+    def process_replication_rows(self, stream_name, instance_name, token, rows):
+        if stream_name == ToDeviceStream.NAME:
+            self._device_inbox_id_gen.advance(instance_name, token)
+            for row in rows:
+                if row.entity.startswith("@"):
+                    self._device_inbox_stream_cache.entity_has_changed(
+                        row.entity, token
+                    )
+                else:
+                    self._device_federation_outbox_stream_cache.entity_has_changed(
+                        row.entity, token
+                    )
+        return super().process_replication_rows(stream_name, instance_name, token, rows)
+
     def get_to_device_stream_token(self):
         return self._device_inbox_id_gen.get_current_token()
 
@@ -278,52 +363,6 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             "get_all_new_device_messages", get_all_new_device_messages_txn
         )
 
-
-class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
-    DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
-
-    def __init__(self, database: DatabasePool, db_conn, hs):
-        super().__init__(database, db_conn, hs)
-
-        self.db_pool.updates.register_background_index_update(
-            "device_inbox_stream_index",
-            index_name="device_inbox_stream_id_user_id",
-            table="device_inbox",
-            columns=["stream_id", "user_id"],
-        )
-
-        self.db_pool.updates.register_background_update_handler(
-            self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox
-        )
-
-    async def _background_drop_index_device_inbox(self, progress, batch_size):
-        def reindex_txn(conn):
-            txn = conn.cursor()
-            txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id")
-            txn.close()
-
-        await self.db_pool.runWithConnection(reindex_txn)
-
-        await self.db_pool.updates._end_background_update(self.DEVICE_INBOX_STREAM_ID)
-
-        return 1
-
-
-class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore):
-    DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
-
-    def __init__(self, database: DatabasePool, db_conn, hs):
-        super().__init__(database, db_conn, hs)
-
-        # Map of (user_id, device_id) to the last stream_id that has been
-        # deleted up to. This is so that we can no op deletions.
-        self._last_device_delete_cache = ExpiringCache(
-            cache_name="last_device_delete_cache",
-            clock=self._clock,
-            max_len=10000,
-            expiry_ms=30 * 60 * 1000,
-        )
-
     @trace
     async def add_messages_to_device_inbox(
         self,
@@ -342,6 +381,8 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
             The new stream_id.
         """
 
+        assert self._can_write_to_device
+
         def add_messages_txn(txn, now_ms, stream_id):
             # Add the local messages directly to the local inbox.
             self._add_messages_to_local_device_inbox_txn(
@@ -351,16 +392,20 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
             # Add the remote messages to the federation outbox.
             # We'll send them to a remote server when we next send a
             # federation transaction to that destination.
-            sql = (
-                "INSERT INTO device_federation_outbox"
-                " (destination, stream_id, queued_ts, messages_json)"
-                " VALUES (?,?,?,?)"
+            self.db_pool.simple_insert_many_txn(
+                txn,
+                table="device_federation_outbox",
+                values=[
+                    {
+                        "destination": destination,
+                        "stream_id": stream_id,
+                        "queued_ts": now_ms,
+                        "messages_json": json_encoder.encode(edu),
+                        "instance_name": self._instance_name,
+                    }
+                    for destination, edu in remote_messages_by_destination.items()
+                ],
             )
-            rows = []
-            for destination, edu in remote_messages_by_destination.items():
-                edu_json = json_encoder.encode(edu)
-                rows.append((destination, stream_id, now_ms, edu_json))
-            txn.executemany(sql, rows)
 
         async with self._device_inbox_id_gen.get_next() as stream_id:
             now_ms = self.clock.time_msec()
@@ -379,6 +424,8 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
     async def add_messages_from_remote_to_device_inbox(
         self, origin: str, message_id: str, local_messages_by_user_then_device: dict
     ) -> int:
+        assert self._can_write_to_device
+
         def add_messages_txn(txn, now_ms, stream_id):
             # Check if we've already inserted a matching message_id for that
             # origin. This can happen if the origin doesn't receive our
@@ -427,38 +474,45 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
     def _add_messages_to_local_device_inbox_txn(
         self, txn, stream_id, messages_by_user_then_device
     ):
+        assert self._can_write_to_device
+
         local_by_user_then_device = {}
         for user_id, messages_by_device in messages_by_user_then_device.items():
             messages_json_for_user = {}
             devices = list(messages_by_device.keys())
             if len(devices) == 1 and devices[0] == "*":
                 # Handle wildcard device_ids.
-                sql = "SELECT device_id FROM devices WHERE user_id = ?"
-                txn.execute(sql, (user_id,))
+                devices = self.db_pool.simple_select_onecol_txn(
+                    txn,
+                    table="devices",
+                    keyvalues={"user_id": user_id},
+                    retcol="device_id",
+                )
+
                 message_json = json_encoder.encode(messages_by_device["*"])
-                for row in txn:
+                for device_id in devices:
                     # Add the message for all devices for this user on this
                     # server.
-                    device = row[0]
-                    messages_json_for_user[device] = message_json
+                    messages_json_for_user[device_id] = message_json
             else:
                 if not devices:
                     continue
 
-                clause, args = make_in_list_sql_clause(
-                    txn.database_engine, "device_id", devices
+                rows = self.db_pool.simple_select_many_txn(
+                    txn,
+                    table="devices",
+                    keyvalues={"user_id": user_id},
+                    column="device_id",
+                    iterable=devices,
+                    retcols=("device_id",),
                 )
-                sql = "SELECT device_id FROM devices WHERE user_id = ? AND " + clause
 
-                # TODO: Maybe this needs to be done in batches if there are
-                # too many local devices for a given user.
-                txn.execute(sql, [user_id] + list(args))
-                for row in txn:
+                for row in rows:
                     # Only insert into the local inbox if the device exists on
                     # this server
-                    device = row[0]
-                    message_json = json_encoder.encode(messages_by_device[device])
-                    messages_json_for_user[device] = message_json
+                    device_id = row["device_id"]
+                    message_json = json_encoder.encode(messages_by_device[device_id])
+                    messages_json_for_user[device_id] = message_json
 
             if messages_json_for_user:
                 local_by_user_then_device[user_id] = messages_json_for_user
@@ -466,14 +520,52 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
         if not local_by_user_then_device:
             return
 
-        sql = (
-            "INSERT INTO device_inbox"
-            " (user_id, device_id, stream_id, message_json)"
-            " VALUES (?,?,?,?)"
+        self.db_pool.simple_insert_many_txn(
+            txn,
+            table="device_inbox",
+            values=[
+                {
+                    "user_id": user_id,
+                    "device_id": device_id,
+                    "stream_id": stream_id,
+                    "message_json": message_json,
+                    "instance_name": self._instance_name,
+                }
+                for user_id, messages_by_device in local_by_user_then_device.items()
+                for device_id, message_json in messages_by_device.items()
+            ],
         )
-        rows = []
-        for user_id, messages_by_device in local_by_user_then_device.items():
-            for device_id, message_json in messages_by_device.items():
-                rows.append((user_id, device_id, stream_id, message_json))
 
-        txn.executemany(sql, rows)
+
+class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
+    DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
+
+    def __init__(self, database: DatabasePool, db_conn, hs):
+        super().__init__(database, db_conn, hs)
+
+        self.db_pool.updates.register_background_index_update(
+            "device_inbox_stream_index",
+            index_name="device_inbox_stream_id_user_id",
+            table="device_inbox",
+            columns=["stream_id", "user_id"],
+        )
+
+        self.db_pool.updates.register_background_update_handler(
+            self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox
+        )
+
+    async def _background_drop_index_device_inbox(self, progress, batch_size):
+        def reindex_txn(conn):
+            txn = conn.cursor()
+            txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id")
+            txn.close()
+
+        await self.db_pool.runWithConnection(reindex_txn)
+
+        await self.db_pool.updates._end_background_update(self.DEVICE_INBOX_STREAM_ID)
+
+        return 1
+
+
+class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore):
+    pass
diff --git a/synapse/storage/databases/main/schema/delta/59/02shard_send_to_device.sql b/synapse/storage/databases/main/schema/delta/59/02shard_send_to_device.sql
new file mode 100644
index 0000000000..d781a92fec
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/02shard_send_to_device.sql
@@ -0,0 +1,18 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE device_inbox ADD COLUMN instance_name TEXT;
+ALTER TABLE device_federation_inbox ADD COLUMN instance_name TEXT;
+ALTER TABLE device_federation_outbox ADD COLUMN instance_name TEXT;
diff --git a/synapse/storage/databases/main/schema/delta/59/03shard_send_to_device_sequence.sql.postgres b/synapse/storage/databases/main/schema/delta/59/03shard_send_to_device_sequence.sql.postgres
new file mode 100644
index 0000000000..45a845a3a5
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/03shard_send_to_device_sequence.sql.postgres
@@ -0,0 +1,25 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE SEQUENCE IF NOT EXISTS device_inbox_sequence;
+
+-- We need to take the max across both device_inbox and device_federation_outbox
+-- tables as they share the ID generator
+SELECT setval('device_inbox_sequence', (
+    SELECT GREATEST(
+        (SELECT COALESCE(MAX(stream_id), 1) FROM device_inbox),
+        (SELECT COALESCE(MAX(stream_id), 1) FROM device_federation_outbox)
+    )
+));