From 3ad699cc65dc55d4329a59a5f621ac4dadaa0fc5 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 16 Dec 2020 14:52:04 +0000 Subject: Fix generate_log_config script (#8952) It used to write an empty file if you gave it a -o arg. --- scripts/generate_log_config | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'scripts') diff --git a/scripts/generate_log_config b/scripts/generate_log_config index b6957f48a3..a13a5634a3 100755 --- a/scripts/generate_log_config +++ b/scripts/generate_log_config @@ -40,4 +40,6 @@ if __name__ == "__main__": ) args = parser.parse_args() - args.output_file.write(DEFAULT_LOG_CONFIG.substitute(log_file=args.log_file)) + out = args.output_file + out.write(DEFAULT_LOG_CONFIG.substitute(log_file=args.log_file)) + out.flush() -- cgit 1.5.1 From b530eaa262b9c8af378f976e5d2628e8c02b10d8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 7 Jan 2021 20:19:26 +0000 Subject: Allow running sendToDevice on workers (#9044) --- changelog.d/9044.feature | 1 + scripts/synapse_port_db | 27 ++++ synapse/app/generic_worker.py | 3 + synapse/config/workers.py | 10 +- synapse/handlers/devicemessage.py | 31 +++-- synapse/replication/slave/storage/deviceinbox.py | 32 +---- synapse/replication/tcp/handler.py | 9 ++ synapse/storage/databases/main/__init__.py | 33 ----- synapse/storage/databases/main/deviceinbox.py | 147 ++++++++++++++++----- .../schema/delta/59/02shard_send_to_device.sql | 18 +++ .../03shard_send_to_device_sequence.sql.postgres | 25 ++++ 11 files changed, 231 insertions(+), 105 deletions(-) create mode 100644 changelog.d/9044.feature create mode 100644 synapse/storage/databases/main/schema/delta/59/02shard_send_to_device.sql create mode 100644 synapse/storage/databases/main/schema/delta/59/03shard_send_to_device_sequence.sql.postgres (limited to 'scripts') diff --git a/changelog.d/9044.feature b/changelog.d/9044.feature new file mode 100644 index 0000000000..4ec319f1f2 --- /dev/null +++ b/changelog.d/9044.feature @@ -0,0 +1 @@ +Add experimental support for handling and persistence of to-device messages to happen on worker processes. diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 5ad17aa90f..22dd169bfb 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -629,6 +629,7 @@ class Porter(object): await self._setup_state_group_id_seq() await self._setup_user_id_seq() await self._setup_events_stream_seqs() + await self._setup_device_inbox_seq() # Step 3. Get tables. self.progress.set_state("Fetching tables") @@ -911,6 +912,32 @@ class Porter(object): "_setup_events_stream_seqs", _setup_events_stream_seqs_set_pos, ) + async def _setup_device_inbox_seq(self): + """Set the device inbox sequence to the correct value. + """ + curr_local_id = await self.sqlite_store.db_pool.simple_select_one_onecol( + table="device_inbox", + keyvalues={}, + retcol="COALESCE(MAX(stream_id), 1)", + allow_none=True, + ) + + curr_federation_id = await self.sqlite_store.db_pool.simple_select_one_onecol( + table="device_federation_outbox", + keyvalues={}, + retcol="COALESCE(MAX(stream_id), 1)", + allow_none=True, + ) + + next_id = max(curr_local_id, curr_federation_id) + 1 + + def r(txn): + txn.execute( + "ALTER SEQUENCE device_inbox_sequence RESTART WITH %s", (next_id,) + ) + + return self.postgres_store.db_pool.runInteraction("_setup_device_inbox_seq", r) + ############################################## # The following is simply UI stuff diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index fa23d9bb20..4428472707 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -108,6 +108,7 @@ from synapse.rest.client.v2_alpha.account_data import ( ) from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet from synapse.rest.client.v2_alpha.register import RegisterRestServlet +from synapse.rest.client.v2_alpha.sendtodevice import SendToDeviceRestServlet from synapse.rest.client.versions import VersionsRestServlet from synapse.rest.health import HealthResource from synapse.rest.key.v2 import KeyApiV2Resource @@ -520,6 +521,8 @@ class GenericWorkerServer(HomeServer): room.register_deprecated_servlets(self, resource) InitialSyncRestServlet(self).register(resource) + SendToDeviceRestServlet(self).register(resource) + user_directory.register_servlets(self, resource) # If presence is disabled, use the stub servlet that does diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 7ca9efec52..364583f48b 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -53,6 +53,9 @@ class WriterLocations: default=["master"], type=List[str], converter=_instance_to_list_converter ) typing = attr.ib(default="master", type=str) + to_device = attr.ib( + default=["master"], type=List[str], converter=_instance_to_list_converter, + ) class WorkerConfig(Config): @@ -124,7 +127,7 @@ class WorkerConfig(Config): # Check that the configured writers for events and typing also appears in # `instance_map`. - for stream in ("events", "typing"): + for stream in ("events", "typing", "to_device"): instances = _instance_to_list_converter(getattr(self.writers, stream)) for instance in instances: if instance != "master" and instance not in self.instance_map: @@ -133,6 +136,11 @@ class WorkerConfig(Config): % (instance, stream) ) + if len(self.writers.to_device) != 1: + raise ConfigError( + "Must only specify one instance to handle `to_device` messages." + ) + self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events) # Whether this worker should run background tasks or not. diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index eb10d2b4bd..fc974a82e8 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -45,11 +45,25 @@ class DeviceMessageHandler: self.store = hs.get_datastore() self.notifier = hs.get_notifier() self.is_mine = hs.is_mine - self.federation = hs.get_federation_sender() - hs.get_federation_registry().register_edu_handler( - "m.direct_to_device", self.on_direct_to_device_edu - ) + # We only need to poke the federation sender explicitly if its on the + # same instance. Other federation sender instances will get notified by + # `synapse.app.generic_worker.FederationSenderHandler` when it sees it + # in the to-device replication stream. + self.federation_sender = None + if hs.should_send_federation(): + self.federation_sender = hs.get_federation_sender() + + # If we can handle the to device EDUs we do so, otherwise we route them + # to the appropriate worker. + if hs.get_instance_name() in hs.config.worker.writers.to_device: + hs.get_federation_registry().register_edu_handler( + "m.direct_to_device", self.on_direct_to_device_edu + ) + else: + hs.get_federation_registry().register_instances_for_edu( + "m.direct_to_device", hs.config.worker.writers.to_device, + ) # The handler to call when we think a user's device list might be out of # sync. We do all device list resyncing on the master instance, so if @@ -204,7 +218,8 @@ class DeviceMessageHandler: ) log_kv({"remote_messages": remote_messages}) - for destination in remote_messages.keys(): - # Enqueue a new federation transaction to send the new - # device messages to each remote destination. - self.federation.send_device_messages(destination) + if self.federation_sender: + for destination in remote_messages.keys(): + # Enqueue a new federation transaction to send the new + # device messages to each remote destination. + self.federation_sender.send_device_messages(destination) diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index 62b68dd6e9..1260f6d141 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -14,38 +14,8 @@ # limitations under the License. from synapse.replication.slave.storage._base import BaseSlavedStore -from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker -from synapse.replication.tcp.streams import ToDeviceStream -from synapse.storage.database import DatabasePool from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore -from synapse.util.caches.stream_change_cache import StreamChangeCache class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore): - def __init__(self, database: DatabasePool, db_conn, hs): - super().__init__(database, db_conn, hs) - self._device_inbox_id_gen = SlavedIdTracker( - db_conn, "device_inbox", "stream_id" - ) - self._device_inbox_stream_cache = StreamChangeCache( - "DeviceInboxStreamChangeCache", - self._device_inbox_id_gen.get_current_token(), - ) - self._device_federation_outbox_stream_cache = StreamChangeCache( - "DeviceFederationOutboxStreamChangeCache", - self._device_inbox_id_gen.get_current_token(), - ) - - def process_replication_rows(self, stream_name, instance_name, token, rows): - if stream_name == ToDeviceStream.NAME: - self._device_inbox_id_gen.advance(instance_name, token) - for row in rows: - if row.entity.startswith("@"): - self._device_inbox_stream_cache.entity_has_changed( - row.entity, token - ) - else: - self._device_federation_outbox_stream_cache.entity_has_changed( - row.entity, token - ) - return super().process_replication_rows(stream_name, instance_name, token, rows) + pass diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 95e5502bf2..1f89249475 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -56,6 +56,7 @@ from synapse.replication.tcp.streams import ( EventsStream, FederationStream, Stream, + ToDeviceStream, TypingStream, ) @@ -115,6 +116,14 @@ class ReplicationCommandHandler: continue + if isinstance(stream, ToDeviceStream): + # Only add ToDeviceStream as a source on instances in charge of + # sending to device messages. + if hs.get_instance_name() in hs.config.worker.writers.to_device: + self._streams_to_replicate.append(stream) + + continue + if isinstance(stream, TypingStream): # Only add TypingStream as a source on the instance in charge of # typing. diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 701748f93b..c4de07a0a8 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -127,9 +127,6 @@ class DataStore( self._presence_id_gen = StreamIdGenerator( db_conn, "presence_stream", "stream_id" ) - self._device_inbox_id_gen = StreamIdGenerator( - db_conn, "device_inbox", "stream_id" - ) self._public_room_id_gen = StreamIdGenerator( db_conn, "public_room_list_stream", "stream_id" ) @@ -189,36 +186,6 @@ class DataStore( prefilled_cache=presence_cache_prefill, ) - max_device_inbox_id = self._device_inbox_id_gen.get_current_token() - device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict( - db_conn, - "device_inbox", - entity_column="user_id", - stream_column="stream_id", - max_value=max_device_inbox_id, - limit=1000, - ) - self._device_inbox_stream_cache = StreamChangeCache( - "DeviceInboxStreamChangeCache", - min_device_inbox_id, - prefilled_cache=device_inbox_prefill, - ) - # The federation outbox and the local device inbox uses the same - # stream_id generator. - device_outbox_prefill, min_device_outbox_id = self.db_pool.get_cache_dict( - db_conn, - "device_federation_outbox", - entity_column="destination", - stream_column="stream_id", - max_value=max_device_inbox_id, - limit=1000, - ) - self._device_federation_outbox_stream_cache = StreamChangeCache( - "DeviceFederationOutboxStreamChangeCache", - min_device_outbox_id, - prefilled_cache=device_outbox_prefill, - ) - device_list_max = self._device_list_id_gen.get_current_token() self._device_list_stream_cache = StreamChangeCache( "DeviceListStreamChangeCache", device_list_max diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index eb72c21155..58d3f71e45 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -17,10 +17,14 @@ import logging from typing import List, Tuple from synapse.logging.opentracing import log_kv, set_tag, trace +from synapse.replication.tcp.streams import ToDeviceStream from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import DatabasePool +from synapse.storage.engines import PostgresEngine +from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator from synapse.util import json_encoder from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.caches.stream_change_cache import StreamChangeCache logger = logging.getLogger(__name__) @@ -29,6 +33,8 @@ class DeviceInboxWorkerStore(SQLBaseStore): def __init__(self, database: DatabasePool, db_conn, hs): super().__init__(database, db_conn, hs) + self._instance_name = hs.get_instance_name() + # Map of (user_id, device_id) to the last stream_id that has been # deleted up to. This is so that we can no op deletions. self._last_device_delete_cache = ExpiringCache( @@ -38,6 +44,73 @@ class DeviceInboxWorkerStore(SQLBaseStore): expiry_ms=30 * 60 * 1000, ) + if isinstance(database.engine, PostgresEngine): + self._can_write_to_device = ( + self._instance_name in hs.config.worker.writers.to_device + ) + + self._device_inbox_id_gen = MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + stream_name="to_device", + instance_name=self._instance_name, + table="device_inbox", + instance_column="instance_name", + id_column="stream_id", + sequence_name="device_inbox_sequence", + writers=hs.config.worker.writers.to_device, + ) + else: + self._can_write_to_device = True + self._device_inbox_id_gen = StreamIdGenerator( + db_conn, "device_inbox", "stream_id" + ) + + max_device_inbox_id = self._device_inbox_id_gen.get_current_token() + device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict( + db_conn, + "device_inbox", + entity_column="user_id", + stream_column="stream_id", + max_value=max_device_inbox_id, + limit=1000, + ) + self._device_inbox_stream_cache = StreamChangeCache( + "DeviceInboxStreamChangeCache", + min_device_inbox_id, + prefilled_cache=device_inbox_prefill, + ) + + # The federation outbox and the local device inbox uses the same + # stream_id generator. + device_outbox_prefill, min_device_outbox_id = self.db_pool.get_cache_dict( + db_conn, + "device_federation_outbox", + entity_column="destination", + stream_column="stream_id", + max_value=max_device_inbox_id, + limit=1000, + ) + self._device_federation_outbox_stream_cache = StreamChangeCache( + "DeviceFederationOutboxStreamChangeCache", + min_device_outbox_id, + prefilled_cache=device_outbox_prefill, + ) + + def process_replication_rows(self, stream_name, instance_name, token, rows): + if stream_name == ToDeviceStream.NAME: + self._device_inbox_id_gen.advance(instance_name, token) + for row in rows: + if row.entity.startswith("@"): + self._device_inbox_stream_cache.entity_has_changed( + row.entity, token + ) + else: + self._device_federation_outbox_stream_cache.entity_has_changed( + row.entity, token + ) + return super().process_replication_rows(stream_name, instance_name, token, rows) + def get_to_device_stream_token(self): return self._device_inbox_id_gen.get_current_token() @@ -290,38 +363,6 @@ class DeviceInboxWorkerStore(SQLBaseStore): "get_all_new_device_messages", get_all_new_device_messages_txn ) - -class DeviceInboxBackgroundUpdateStore(SQLBaseStore): - DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" - - def __init__(self, database: DatabasePool, db_conn, hs): - super().__init__(database, db_conn, hs) - - self.db_pool.updates.register_background_index_update( - "device_inbox_stream_index", - index_name="device_inbox_stream_id_user_id", - table="device_inbox", - columns=["stream_id", "user_id"], - ) - - self.db_pool.updates.register_background_update_handler( - self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox - ) - - async def _background_drop_index_device_inbox(self, progress, batch_size): - def reindex_txn(conn): - txn = conn.cursor() - txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id") - txn.close() - - await self.db_pool.runWithConnection(reindex_txn) - - await self.db_pool.updates._end_background_update(self.DEVICE_INBOX_STREAM_ID) - - return 1 - - -class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore): @trace async def add_messages_to_device_inbox( self, @@ -340,6 +381,8 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore) The new stream_id. """ + assert self._can_write_to_device + def add_messages_txn(txn, now_ms, stream_id): # Add the local messages directly to the local inbox. self._add_messages_to_local_device_inbox_txn( @@ -358,6 +401,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore) "stream_id": stream_id, "queued_ts": now_ms, "messages_json": json_encoder.encode(edu), + "instance_name": self._instance_name, } for destination, edu in remote_messages_by_destination.items() ], @@ -380,6 +424,8 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore) async def add_messages_from_remote_to_device_inbox( self, origin: str, message_id: str, local_messages_by_user_then_device: dict ) -> int: + assert self._can_write_to_device + def add_messages_txn(txn, now_ms, stream_id): # Check if we've already inserted a matching message_id for that # origin. This can happen if the origin doesn't receive our @@ -428,6 +474,8 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore) def _add_messages_to_local_device_inbox_txn( self, txn, stream_id, messages_by_user_then_device ): + assert self._can_write_to_device + local_by_user_then_device = {} for user_id, messages_by_device in messages_by_user_then_device.items(): messages_json_for_user = {} @@ -481,8 +529,43 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore) "device_id": device_id, "stream_id": stream_id, "message_json": message_json, + "instance_name": self._instance_name, } for user_id, messages_by_device in local_by_user_then_device.items() for device_id, message_json in messages_by_device.items() ], ) + + +class DeviceInboxBackgroundUpdateStore(SQLBaseStore): + DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" + + def __init__(self, database: DatabasePool, db_conn, hs): + super().__init__(database, db_conn, hs) + + self.db_pool.updates.register_background_index_update( + "device_inbox_stream_index", + index_name="device_inbox_stream_id_user_id", + table="device_inbox", + columns=["stream_id", "user_id"], + ) + + self.db_pool.updates.register_background_update_handler( + self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox + ) + + async def _background_drop_index_device_inbox(self, progress, batch_size): + def reindex_txn(conn): + txn = conn.cursor() + txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id") + txn.close() + + await self.db_pool.runWithConnection(reindex_txn) + + await self.db_pool.updates._end_background_update(self.DEVICE_INBOX_STREAM_ID) + + return 1 + + +class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore): + pass diff --git a/synapse/storage/databases/main/schema/delta/59/02shard_send_to_device.sql b/synapse/storage/databases/main/schema/delta/59/02shard_send_to_device.sql new file mode 100644 index 0000000000..d781a92fec --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/02shard_send_to_device.sql @@ -0,0 +1,18 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE device_inbox ADD COLUMN instance_name TEXT; +ALTER TABLE device_federation_inbox ADD COLUMN instance_name TEXT; +ALTER TABLE device_federation_outbox ADD COLUMN instance_name TEXT; diff --git a/synapse/storage/databases/main/schema/delta/59/03shard_send_to_device_sequence.sql.postgres b/synapse/storage/databases/main/schema/delta/59/03shard_send_to_device_sequence.sql.postgres new file mode 100644 index 0000000000..45a845a3a5 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/03shard_send_to_device_sequence.sql.postgres @@ -0,0 +1,25 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE SEQUENCE IF NOT EXISTS device_inbox_sequence; + +-- We need to take the max across both device_inbox and device_federation_outbox +-- tables as they share the ID generator +SELECT setval('device_inbox_sequence', ( + SELECT GREATEST( + (SELECT COALESCE(MAX(stream_id), 1) FROM device_inbox), + (SELECT COALESCE(MAX(stream_id), 1) FROM device_federation_outbox) + ) +)); -- cgit 1.5.1 From 7036e24e98fc21855c34876d7024015470721bbe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Jan 2021 15:18:27 +0000 Subject: Add background update for add chain cover index (#9029) --- changelog.d/8868.misc | 2 +- changelog.d/9029.misc | 1 + scripts/synapse_port_db | 2 +- synapse/storage/databases/main/events.py | 50 ++++-- .../storage/databases/main/events_bg_updates.py | 192 ++++++++++++++++++++- .../main/schema/delta/59/06chain_cover_index.sql | 17 ++ tests/storage/test_event_chain.py | 114 ++++++++++++ 7 files changed, 360 insertions(+), 18 deletions(-) create mode 100644 changelog.d/9029.misc create mode 100644 synapse/storage/databases/main/schema/delta/59/06chain_cover_index.sql (limited to 'scripts') diff --git a/changelog.d/8868.misc b/changelog.d/8868.misc index 1a11e30944..346741d982 100644 --- a/changelog.d/8868.misc +++ b/changelog.d/8868.misc @@ -1 +1 @@ -Improve efficiency of large state resolutions for new rooms. +Improve efficiency of large state resolutions. diff --git a/changelog.d/9029.misc b/changelog.d/9029.misc new file mode 100644 index 0000000000..346741d982 --- /dev/null +++ b/changelog.d/9029.misc @@ -0,0 +1 @@ +Improve efficiency of large state resolutions. diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 22dd169bfb..69bf9110a6 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -70,7 +70,7 @@ logger = logging.getLogger("synapse_port_db") BOOLEAN_COLUMNS = { "events": ["processed", "outlier", "contains_url"], - "rooms": ["is_public"], + "rooms": ["is_public", "has_auth_chain_index"], "event_edges": ["is_state"], "presence_list": ["accepted"], "presence_stream": ["currently_active"], diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 186f064036..e0fbcc58cf 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -466,9 +466,6 @@ class PersistEventsStore: if not state_events: return - # Map from event ID to chain ID/sequence number. - chain_map = {} # type: Dict[str, Tuple[int, int]] - # We need to know the type/state_key and auth events of the events we're # calculating chain IDs for. We don't rely on having the full Event # instances as we'll potentially be pulling more events from the DB and @@ -479,9 +476,33 @@ class PersistEventsStore: event_to_auth_chain = { e.event_id: e.auth_event_ids() for e in state_events.values() } + event_to_room_id = {e.event_id: e.room_id for e in state_events.values()} + + self._add_chain_cover_index( + txn, event_to_room_id, event_to_types, event_to_auth_chain + ) + + def _add_chain_cover_index( + self, + txn, + event_to_room_id: Dict[str, str], + event_to_types: Dict[str, Tuple[str, str]], + event_to_auth_chain: Dict[str, List[str]], + ) -> None: + """Calculate the chain cover index for the given events. + + Args: + event_to_room_id: Event ID to the room ID of the event + event_to_types: Event ID to type and state_key of the event + event_to_auth_chain: Event ID to list of auth event IDs of the + event (events with no auth events can be excluded). + """ + + # Map from event ID to chain ID/sequence number. + chain_map = {} # type: Dict[str, Tuple[int, int]] # Set of event IDs to calculate chain ID/seq numbers for. - events_to_calc_chain_id_for = set(state_events) + events_to_calc_chain_id_for = set(event_to_room_id) # We check if there are any events that need to be handled in the rooms # we're looking at. These should just be out of band memberships, where @@ -491,7 +512,7 @@ class PersistEventsStore: table="event_auth_chain_to_calculate", keyvalues={}, column="room_id", - iterable={e.room_id for e in state_events.values()}, + iterable=set(event_to_room_id.values()), retcols=("event_id", "type", "state_key"), ) for row in rows: @@ -582,16 +603,17 @@ class PersistEventsStore: # the list of events to calculate chain IDs for next time # around. (Otherwise we will have already added it to the # table). - event = state_events.get(event_id) - if event: + room_id = event_to_room_id.get(event_id) + if room_id: + e_type, state_key = event_to_types[event_id] self.db_pool.simple_insert_txn( txn, table="event_auth_chain_to_calculate", values={ - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, + "event_id": event_id, + "room_id": room_id, + "type": e_type, + "state_key": state_key, }, ) @@ -617,7 +639,7 @@ class PersistEventsStore: events_to_calc_chain_id_for, event_to_auth_chain ): existing_chain_id = None - for auth_id in event_to_auth_chain[event_id]: + for auth_id in event_to_auth_chain.get(event_id, []): if event_to_types.get(event_id) == event_to_types.get(auth_id): existing_chain_id = chain_map[auth_id] break @@ -730,11 +752,11 @@ class PersistEventsStore: # auth events (A, B) to check if B is reachable from A. reduction = { a_id - for a_id in event_to_auth_chain[event_id] + for a_id in event_to_auth_chain.get(event_id, []) if chain_map[a_id][0] != chain_id } for start_auth_id, end_auth_id in itertools.permutations( - event_to_auth_chain[event_id], r=2, + event_to_auth_chain.get(event_id, []), r=2, ): if chain_links.exists_path_from( chain_map[start_auth_id], chain_map[end_auth_id] diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 7e4b175d08..90a40a92b4 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -14,13 +14,13 @@ # limitations under the License. import logging -from typing import List, Tuple +from typing import Dict, List, Optional, Tuple from synapse.api.constants import EventContentFields from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import make_event_from_dict from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause -from synapse.storage.database import DatabasePool +from synapse.storage.database import DatabasePool, make_tuple_comparison_clause from synapse.storage.types import Cursor from synapse.types import JsonDict @@ -108,6 +108,10 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): "rejected_events_metadata", self._rejected_events_metadata, ) + self.db_pool.updates.register_background_update_handler( + "chain_cover", self._chain_cover_index, + ) + async def _background_reindex_fields_sender(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] @@ -706,3 +710,187 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): ) return len(results) + + async def _chain_cover_index(self, progress: dict, batch_size: int) -> int: + """A background updates that iterates over all rooms and generates the + chain cover index for them. + """ + + current_room_id = progress.get("current_room_id", "") + + # Have we finished processing the current room. + finished = progress.get("finished", True) + + # Where we've processed up to in the room, defaults to the start of the + # room. + last_depth = progress.get("last_depth", -1) + last_stream = progress.get("last_stream", -1) + + # Have we set the `has_auth_chain_index` for the room yet. + has_set_room_has_chain_index = progress.get( + "has_set_room_has_chain_index", False + ) + + if finished: + # If we've finished with the previous room (or its our first + # iteration) we move on to the next room. + + def _get_next_room(txn: Cursor) -> Optional[str]: + sql = """ + SELECT room_id FROM rooms + WHERE room_id > ? + AND ( + NOT has_auth_chain_index + OR has_auth_chain_index IS NULL + ) + ORDER BY room_id + LIMIT 1 + """ + txn.execute(sql, (current_room_id,)) + row = txn.fetchone() + if row: + return row[0] + + return None + + current_room_id = await self.db_pool.runInteraction( + "_chain_cover_index", _get_next_room + ) + if not current_room_id: + await self.db_pool.updates._end_background_update("chain_cover") + return 0 + + logger.debug("Adding chain cover to %s", current_room_id) + + def _calculate_auth_chain( + txn: Cursor, last_depth: int, last_stream: int + ) -> Tuple[int, int, int]: + # Get the next set of events in the room (that we haven't already + # computed chain cover for). We do this in topological order. + + # We want to do a `(topological_ordering, stream_ordering) > (?,?)` + # comparison, but that is not supported on older SQLite versions + tuple_clause, tuple_args = make_tuple_comparison_clause( + self.database_engine, + [ + ("topological_ordering", last_depth), + ("stream_ordering", last_stream), + ], + ) + + sql = """ + SELECT + event_id, state_events.type, state_events.state_key, + topological_ordering, stream_ordering + FROM events + INNER JOIN state_events USING (event_id) + LEFT JOIN event_auth_chains USING (event_id) + LEFT JOIN event_auth_chain_to_calculate USING (event_id) + WHERE events.room_id = ? + AND event_auth_chains.event_id IS NULL + AND event_auth_chain_to_calculate.event_id IS NULL + AND %(tuple_cmp)s + ORDER BY topological_ordering, stream_ordering + LIMIT ? + """ % { + "tuple_cmp": tuple_clause, + } + + args = [current_room_id] + args.extend(tuple_args) + args.append(batch_size) + + txn.execute(sql, args) + rows = txn.fetchall() + + # Put the results in the necessary format for + # `_add_chain_cover_index` + event_to_room_id = {row[0]: current_room_id for row in rows} + event_to_types = {row[0]: (row[1], row[2]) for row in rows} + + new_last_depth = rows[-1][3] if rows else last_depth # type: int + new_last_stream = rows[-1][4] if rows else last_stream # type: int + + count = len(rows) + + # We also need to fetch the auth events for them. + auth_events = self.db_pool.simple_select_many_txn( + txn, + table="event_auth", + column="event_id", + iterable=event_to_room_id, + keyvalues={}, + retcols=("event_id", "auth_id"), + ) + + event_to_auth_chain = {} # type: Dict[str, List[str]] + for row in auth_events: + event_to_auth_chain.setdefault(row["event_id"], []).append( + row["auth_id"] + ) + + # Calculate and persist the chain cover index for this set of events. + # + # Annoyingly we need to gut wrench into the persit event store so that + # we can reuse the function to calculate the chain cover for rooms. + self.hs.get_datastores().persist_events._add_chain_cover_index( + txn, event_to_room_id, event_to_types, event_to_auth_chain, + ) + + return new_last_depth, new_last_stream, count + + last_depth, last_stream, count = await self.db_pool.runInteraction( + "_chain_cover_index", _calculate_auth_chain, last_depth, last_stream + ) + + total_rows_processed = count + + if count < batch_size and not has_set_room_has_chain_index: + # If we've done all the events in the room we flip the + # `has_auth_chain_index` in the DB. Note that its possible for + # further events to be persisted between the above and setting the + # flag without having the chain cover calculated for them. This is + # fine as a) the code gracefully handles these cases and b) we'll + # calculate them below. + + await self.db_pool.simple_update( + table="rooms", + keyvalues={"room_id": current_room_id}, + updatevalues={"has_auth_chain_index": True}, + desc="_chain_cover_index", + ) + has_set_room_has_chain_index = True + + # Handle any events that might have raced with us flipping the + # bit above. + last_depth, last_stream, count = await self.db_pool.runInteraction( + "_chain_cover_index", _calculate_auth_chain, last_depth, last_stream + ) + + total_rows_processed += count + + # Note that at this point its technically possible that more events + # than our `batch_size` have been persisted without their chain + # cover, so we need to continue processing this room if the last + # count returned was equal to the `batch_size`. + + if count < batch_size: + # We've finished calculating the index for this room, move on to the + # next room. + await self.db_pool.updates._background_update_progress( + "chain_cover", {"current_room_id": current_room_id, "finished": True}, + ) + else: + # We still have outstanding events to calculate the index for. + await self.db_pool.updates._background_update_progress( + "chain_cover", + { + "current_room_id": current_room_id, + "last_depth": last_depth, + "last_stream": last_stream, + "has_auth_chain_index": has_set_room_has_chain_index, + "finished": False, + }, + ) + + return total_rows_processed diff --git a/synapse/storage/databases/main/schema/delta/59/06chain_cover_index.sql b/synapse/storage/databases/main/schema/delta/59/06chain_cover_index.sql new file mode 100644 index 0000000000..fe3dca71dd --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/06chain_cover_index.sql @@ -0,0 +1,17 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES + (5906, 'chain_cover', '{}', 'rejected_events_metadata'); diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py index 83c377824b..ff67a73749 100644 --- a/tests/storage/test_event_chain.py +++ b/tests/storage/test_event_chain.py @@ -20,7 +20,10 @@ from twisted.trial import unittest from synapse.api.constants import EventTypes from synapse.api.room_versions import RoomVersions from synapse.events import EventBase +from synapse.rest import admin +from synapse.rest.client.v1 import login, room from synapse.storage.databases.main.events import _LinkMap +from synapse.types import create_requester from tests.unittest import HomeserverTestCase @@ -470,3 +473,114 @@ class LinkMapTestCase(unittest.TestCase): self.assertCountEqual(link_map.get_links_between(1, 2), [(1, 1), (3, 3)]) self.assertCountEqual(link_map.get_additions(), [(1, 3, 2, 3), (2, 5, 1, 3)]) + + +class EventChainBackgroundUpdateTestCase(HomeserverTestCase): + + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + + def test_background_update(self): + """Test that the background update to calculate auth chains for historic + rooms works correctly. + """ + + # Create a room + user_id = self.register_user("foo", "pass") + token = self.login("foo", "pass") + room_id = self.helper.create_room_as(user_id, tok=token) + requester = create_requester(user_id) + + store = self.hs.get_datastore() + + # Mark the room as not having a chain cover index + self.get_success( + store.db_pool.simple_update( + table="rooms", + keyvalues={"room_id": room_id}, + updatevalues={"has_auth_chain_index": False}, + desc="test", + ) + ) + + # Create a fork in the DAG with different events. + event_handler = self.hs.get_event_creation_handler() + latest_event_ids = self.get_success(store.get_prev_events_for_room(room_id)) + event, context = self.get_success( + event_handler.create_event( + requester, + { + "type": "some_state_type", + "state_key": "", + "content": {}, + "room_id": room_id, + "sender": user_id, + }, + prev_event_ids=latest_event_ids, + ) + ) + self.get_success( + event_handler.handle_new_client_event(requester, event, context) + ) + state1 = list(self.get_success(context.get_current_state_ids()).values()) + + event, context = self.get_success( + event_handler.create_event( + requester, + { + "type": "some_state_type", + "state_key": "", + "content": {}, + "room_id": room_id, + "sender": user_id, + }, + prev_event_ids=latest_event_ids, + ) + ) + self.get_success( + event_handler.handle_new_client_event(requester, event, context) + ) + state2 = list(self.get_success(context.get_current_state_ids()).values()) + + # Delete the chain cover info. + + def _delete_tables(txn): + txn.execute("DELETE FROM event_auth_chains") + txn.execute("DELETE FROM event_auth_chain_links") + + self.get_success(store.db_pool.runInteraction("test", _delete_tables)) + + # Insert and run the background update. + self.get_success( + store.db_pool.simple_insert( + "background_updates", + {"update_name": "chain_cover", "progress_json": "{}"}, + ) + ) + + # Ugh, have to reset this flag + store.db_pool.updates._all_done = False + + while not self.get_success( + store.db_pool.updates.has_completed_background_updates() + ): + self.get_success( + store.db_pool.updates.do_next_background_update(100), by=0.1 + ) + + # Test that the `has_auth_chain_index` has been set + self.assertTrue(self.get_success(store.has_auth_chain_index(room_id))) + + # Test that calculating the auth chain difference using the newly + # calculated chain cover works. + self.get_success( + store.db_pool.runInteraction( + "test", + store._get_auth_chain_difference_using_cover_index_txn, + room_id, + [state1, state2], + ) + ) -- cgit 1.5.1