diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 701748f93b..c4de07a0a8 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -127,9 +127,6 @@ class DataStore(
self._presence_id_gen = StreamIdGenerator(
db_conn, "presence_stream", "stream_id"
)
- self._device_inbox_id_gen = StreamIdGenerator(
- db_conn, "device_inbox", "stream_id"
- )
self._public_room_id_gen = StreamIdGenerator(
db_conn, "public_room_list_stream", "stream_id"
)
@@ -189,36 +186,6 @@ class DataStore(
prefilled_cache=presence_cache_prefill,
)
- max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
- device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict(
- db_conn,
- "device_inbox",
- entity_column="user_id",
- stream_column="stream_id",
- max_value=max_device_inbox_id,
- limit=1000,
- )
- self._device_inbox_stream_cache = StreamChangeCache(
- "DeviceInboxStreamChangeCache",
- min_device_inbox_id,
- prefilled_cache=device_inbox_prefill,
- )
- # The federation outbox and the local device inbox uses the same
- # stream_id generator.
- device_outbox_prefill, min_device_outbox_id = self.db_pool.get_cache_dict(
- db_conn,
- "device_federation_outbox",
- entity_column="destination",
- stream_column="stream_id",
- max_value=max_device_inbox_id,
- limit=1000,
- )
- self._device_federation_outbox_stream_cache = StreamChangeCache(
- "DeviceFederationOutboxStreamChangeCache",
- min_device_outbox_id,
- prefilled_cache=device_outbox_prefill,
- )
-
device_list_max = self._device_list_id_gen.get_current_token()
self._device_list_stream_cache = StreamChangeCache(
"DeviceListStreamChangeCache", device_list_max
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index d42faa3f1f..58d3f71e45 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -17,15 +17,100 @@ import logging
from typing import List, Tuple
from synapse.logging.opentracing import log_kv, set_tag, trace
-from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
+from synapse.replication.tcp.streams import ToDeviceStream
+from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
from synapse.util import json_encoder
from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.caches.stream_change_cache import StreamChangeCache
logger = logging.getLogger(__name__)
class DeviceInboxWorkerStore(SQLBaseStore):
+ def __init__(self, database: DatabasePool, db_conn, hs):
+ super().__init__(database, db_conn, hs)
+
+ self._instance_name = hs.get_instance_name()
+
+ # Map of (user_id, device_id) to the last stream_id that has been
+ # deleted up to. This is so that we can no op deletions.
+ self._last_device_delete_cache = ExpiringCache(
+ cache_name="last_device_delete_cache",
+ clock=self._clock,
+ max_len=10000,
+ expiry_ms=30 * 60 * 1000,
+ )
+
+ if isinstance(database.engine, PostgresEngine):
+ self._can_write_to_device = (
+ self._instance_name in hs.config.worker.writers.to_device
+ )
+
+ self._device_inbox_id_gen = MultiWriterIdGenerator(
+ db_conn=db_conn,
+ db=database,
+ stream_name="to_device",
+ instance_name=self._instance_name,
+ table="device_inbox",
+ instance_column="instance_name",
+ id_column="stream_id",
+ sequence_name="device_inbox_sequence",
+ writers=hs.config.worker.writers.to_device,
+ )
+ else:
+ self._can_write_to_device = True
+ self._device_inbox_id_gen = StreamIdGenerator(
+ db_conn, "device_inbox", "stream_id"
+ )
+
+ max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
+ device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict(
+ db_conn,
+ "device_inbox",
+ entity_column="user_id",
+ stream_column="stream_id",
+ max_value=max_device_inbox_id,
+ limit=1000,
+ )
+ self._device_inbox_stream_cache = StreamChangeCache(
+ "DeviceInboxStreamChangeCache",
+ min_device_inbox_id,
+ prefilled_cache=device_inbox_prefill,
+ )
+
+ # The federation outbox and the local device inbox uses the same
+ # stream_id generator.
+ device_outbox_prefill, min_device_outbox_id = self.db_pool.get_cache_dict(
+ db_conn,
+ "device_federation_outbox",
+ entity_column="destination",
+ stream_column="stream_id",
+ max_value=max_device_inbox_id,
+ limit=1000,
+ )
+ self._device_federation_outbox_stream_cache = StreamChangeCache(
+ "DeviceFederationOutboxStreamChangeCache",
+ min_device_outbox_id,
+ prefilled_cache=device_outbox_prefill,
+ )
+
+ def process_replication_rows(self, stream_name, instance_name, token, rows):
+ if stream_name == ToDeviceStream.NAME:
+ self._device_inbox_id_gen.advance(instance_name, token)
+ for row in rows:
+ if row.entity.startswith("@"):
+ self._device_inbox_stream_cache.entity_has_changed(
+ row.entity, token
+ )
+ else:
+ self._device_federation_outbox_stream_cache.entity_has_changed(
+ row.entity, token
+ )
+ return super().process_replication_rows(stream_name, instance_name, token, rows)
+
def get_to_device_stream_token(self):
return self._device_inbox_id_gen.get_current_token()
@@ -278,52 +363,6 @@ class DeviceInboxWorkerStore(SQLBaseStore):
"get_all_new_device_messages", get_all_new_device_messages_txn
)
-
-class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
- DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
-
- def __init__(self, database: DatabasePool, db_conn, hs):
- super().__init__(database, db_conn, hs)
-
- self.db_pool.updates.register_background_index_update(
- "device_inbox_stream_index",
- index_name="device_inbox_stream_id_user_id",
- table="device_inbox",
- columns=["stream_id", "user_id"],
- )
-
- self.db_pool.updates.register_background_update_handler(
- self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox
- )
-
- async def _background_drop_index_device_inbox(self, progress, batch_size):
- def reindex_txn(conn):
- txn = conn.cursor()
- txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id")
- txn.close()
-
- await self.db_pool.runWithConnection(reindex_txn)
-
- await self.db_pool.updates._end_background_update(self.DEVICE_INBOX_STREAM_ID)
-
- return 1
-
-
-class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore):
- DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
-
- def __init__(self, database: DatabasePool, db_conn, hs):
- super().__init__(database, db_conn, hs)
-
- # Map of (user_id, device_id) to the last stream_id that has been
- # deleted up to. This is so that we can no op deletions.
- self._last_device_delete_cache = ExpiringCache(
- cache_name="last_device_delete_cache",
- clock=self._clock,
- max_len=10000,
- expiry_ms=30 * 60 * 1000,
- )
-
@trace
async def add_messages_to_device_inbox(
self,
@@ -342,6 +381,8 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
The new stream_id.
"""
+ assert self._can_write_to_device
+
def add_messages_txn(txn, now_ms, stream_id):
# Add the local messages directly to the local inbox.
self._add_messages_to_local_device_inbox_txn(
@@ -351,16 +392,20 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
# Add the remote messages to the federation outbox.
# We'll send them to a remote server when we next send a
# federation transaction to that destination.
- sql = (
- "INSERT INTO device_federation_outbox"
- " (destination, stream_id, queued_ts, messages_json)"
- " VALUES (?,?,?,?)"
+ self.db_pool.simple_insert_many_txn(
+ txn,
+ table="device_federation_outbox",
+ values=[
+ {
+ "destination": destination,
+ "stream_id": stream_id,
+ "queued_ts": now_ms,
+ "messages_json": json_encoder.encode(edu),
+ "instance_name": self._instance_name,
+ }
+ for destination, edu in remote_messages_by_destination.items()
+ ],
)
- rows = []
- for destination, edu in remote_messages_by_destination.items():
- edu_json = json_encoder.encode(edu)
- rows.append((destination, stream_id, now_ms, edu_json))
- txn.executemany(sql, rows)
async with self._device_inbox_id_gen.get_next() as stream_id:
now_ms = self.clock.time_msec()
@@ -379,6 +424,8 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
async def add_messages_from_remote_to_device_inbox(
self, origin: str, message_id: str, local_messages_by_user_then_device: dict
) -> int:
+ assert self._can_write_to_device
+
def add_messages_txn(txn, now_ms, stream_id):
# Check if we've already inserted a matching message_id for that
# origin. This can happen if the origin doesn't receive our
@@ -427,38 +474,45 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
def _add_messages_to_local_device_inbox_txn(
self, txn, stream_id, messages_by_user_then_device
):
+ assert self._can_write_to_device
+
local_by_user_then_device = {}
for user_id, messages_by_device in messages_by_user_then_device.items():
messages_json_for_user = {}
devices = list(messages_by_device.keys())
if len(devices) == 1 and devices[0] == "*":
# Handle wildcard device_ids.
- sql = "SELECT device_id FROM devices WHERE user_id = ?"
- txn.execute(sql, (user_id,))
+ devices = self.db_pool.simple_select_onecol_txn(
+ txn,
+ table="devices",
+ keyvalues={"user_id": user_id},
+ retcol="device_id",
+ )
+
message_json = json_encoder.encode(messages_by_device["*"])
- for row in txn:
+ for device_id in devices:
# Add the message for all devices for this user on this
# server.
- device = row[0]
- messages_json_for_user[device] = message_json
+ messages_json_for_user[device_id] = message_json
else:
if not devices:
continue
- clause, args = make_in_list_sql_clause(
- txn.database_engine, "device_id", devices
+ rows = self.db_pool.simple_select_many_txn(
+ txn,
+ table="devices",
+ keyvalues={"user_id": user_id},
+ column="device_id",
+ iterable=devices,
+ retcols=("device_id",),
)
- sql = "SELECT device_id FROM devices WHERE user_id = ? AND " + clause
- # TODO: Maybe this needs to be done in batches if there are
- # too many local devices for a given user.
- txn.execute(sql, [user_id] + list(args))
- for row in txn:
+ for row in rows:
# Only insert into the local inbox if the device exists on
# this server
- device = row[0]
- message_json = json_encoder.encode(messages_by_device[device])
- messages_json_for_user[device] = message_json
+ device_id = row["device_id"]
+ message_json = json_encoder.encode(messages_by_device[device_id])
+ messages_json_for_user[device_id] = message_json
if messages_json_for_user:
local_by_user_then_device[user_id] = messages_json_for_user
@@ -466,14 +520,52 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
if not local_by_user_then_device:
return
- sql = (
- "INSERT INTO device_inbox"
- " (user_id, device_id, stream_id, message_json)"
- " VALUES (?,?,?,?)"
+ self.db_pool.simple_insert_many_txn(
+ txn,
+ table="device_inbox",
+ values=[
+ {
+ "user_id": user_id,
+ "device_id": device_id,
+ "stream_id": stream_id,
+ "message_json": message_json,
+ "instance_name": self._instance_name,
+ }
+ for user_id, messages_by_device in local_by_user_then_device.items()
+ for device_id, message_json in messages_by_device.items()
+ ],
)
- rows = []
- for user_id, messages_by_device in local_by_user_then_device.items():
- for device_id, message_json in messages_by_device.items():
- rows.append((user_id, device_id, stream_id, message_json))
- txn.executemany(sql, rows)
+
+class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
+ DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
+
+ def __init__(self, database: DatabasePool, db_conn, hs):
+ super().__init__(database, db_conn, hs)
+
+ self.db_pool.updates.register_background_index_update(
+ "device_inbox_stream_index",
+ index_name="device_inbox_stream_id_user_id",
+ table="device_inbox",
+ columns=["stream_id", "user_id"],
+ )
+
+ self.db_pool.updates.register_background_update_handler(
+ self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox
+ )
+
+ async def _background_drop_index_device_inbox(self, progress, batch_size):
+ def reindex_txn(conn):
+ txn = conn.cursor()
+ txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id")
+ txn.close()
+
+ await self.db_pool.runWithConnection(reindex_txn)
+
+ await self.db_pool.updates._end_background_update(self.DEVICE_INBOX_STREAM_ID)
+
+ return 1
+
+
+class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore):
+ pass
diff --git a/synapse/storage/databases/main/schema/delta/59/02shard_send_to_device.sql b/synapse/storage/databases/main/schema/delta/59/02shard_send_to_device.sql
new file mode 100644
index 0000000000..d781a92fec
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/02shard_send_to_device.sql
@@ -0,0 +1,18 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE device_inbox ADD COLUMN instance_name TEXT;
+ALTER TABLE device_federation_inbox ADD COLUMN instance_name TEXT;
+ALTER TABLE device_federation_outbox ADD COLUMN instance_name TEXT;
diff --git a/synapse/storage/databases/main/schema/delta/59/03shard_send_to_device_sequence.sql.postgres b/synapse/storage/databases/main/schema/delta/59/03shard_send_to_device_sequence.sql.postgres
new file mode 100644
index 0000000000..45a845a3a5
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/03shard_send_to_device_sequence.sql.postgres
@@ -0,0 +1,25 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE SEQUENCE IF NOT EXISTS device_inbox_sequence;
+
+-- We need to take the max across both device_inbox and device_federation_outbox
+-- tables as they share the ID generator
+SELECT setval('device_inbox_sequence', (
+ SELECT GREATEST(
+ (SELECT COALESCE(MAX(stream_id), 1) FROM device_inbox),
+ (SELECT COALESCE(MAX(stream_id), 1) FROM device_federation_outbox)
+ )
+));
|