diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/data_stores/main/devices.py | 60 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/schema/delta/58/04device_lists_outbound_last_success_unique_idx.sql | 28 | ||||
-rw-r--r-- | synapse/storage/database.py | 74 | ||||
-rw-r--r-- | synapse/storage/util/id_generators.py | 169 |
4 files changed, 281 insertions, 50 deletions
diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index ee3a2ab031..536cef3abd 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -55,6 +55,10 @@ DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = ( BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes" +BG_UPDATE_DROP_DEVICE_LISTS_OUTBOUND_LAST_SUCCESS_NON_UNIQUE_IDX = ( + "drop_device_lists_outbound_last_success_non_unique_idx" +) + class DeviceWorkerStore(SQLBaseStore): def get_device(self, user_id, device_id): @@ -342,32 +346,23 @@ class DeviceWorkerStore(SQLBaseStore): def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id): # We update the device_lists_outbound_last_success with the successfully - # poked users. We do the join to see which users need to be inserted and - # which updated. + # poked users. sql = """ - SELECT user_id, coalesce(max(o.stream_id), 0), (max(s.stream_id) IS NOT NULL) + SELECT user_id, coalesce(max(o.stream_id), 0) FROM device_lists_outbound_pokes as o - LEFT JOIN device_lists_outbound_last_success as s - USING (destination, user_id) WHERE destination = ? AND o.stream_id <= ? GROUP BY user_id """ txn.execute(sql, (destination, stream_id)) rows = txn.fetchall() - sql = """ - UPDATE device_lists_outbound_last_success - SET stream_id = ? - WHERE destination = ? AND user_id = ? - """ - txn.executemany(sql, ((row[1], destination, row[0]) for row in rows if row[2])) - - sql = """ - INSERT INTO device_lists_outbound_last_success - (destination, user_id, stream_id) VALUES (?, ?, ?) - """ - txn.executemany( - sql, ((destination, row[0], row[1]) for row in rows if not row[2]) + self.db.simple_upsert_many_txn( + txn=txn, + table="device_lists_outbound_last_success", + key_names=("destination", "user_id"), + key_values=((destination, user_id) for user_id, _ in rows), + value_names=("stream_id",), + value_values=((stream_id,) for _, stream_id in rows), ) # Delete all sent outbound pokes @@ -725,6 +720,21 @@ class DeviceBackgroundUpdateStore(SQLBaseStore): BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES, self._remove_duplicate_outbound_pokes, ) + # create a unique index on device_lists_outbound_last_success + self.db.updates.register_background_index_update( + "device_lists_outbound_last_success_unique_idx", + index_name="device_lists_outbound_last_success_unique_idx", + table="device_lists_outbound_last_success", + columns=["destination", "user_id"], + unique=True, + ) + + # once that completes, we can remove the old non-unique index. + self.db.updates.register_background_update_handler( + BG_UPDATE_DROP_DEVICE_LISTS_OUTBOUND_LAST_SUCCESS_NON_UNIQUE_IDX, + self._drop_device_lists_outbound_last_success_non_unique_idx, + ) + @defer.inlineCallbacks def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size): def f(conn): @@ -799,6 +809,20 @@ class DeviceBackgroundUpdateStore(SQLBaseStore): return rows + async def _drop_device_lists_outbound_last_success_non_unique_idx( + self, progress, batch_size + ): + def f(txn): + txn.execute("DROP INDEX IF EXISTS device_lists_outbound_last_success_idx") + + await self.db.runInteraction( + "drop_device_lists_outbound_last_success_non_unique_idx", f, + ) + await self.db.updates._end_background_update( + BG_UPDATE_DROP_DEVICE_LISTS_OUTBOUND_LAST_SUCCESS_NON_UNIQUE_IDX + ) + return 1 + class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): def __init__(self, database: Database, db_conn, hs): diff --git a/synapse/storage/data_stores/main/schema/delta/58/04device_lists_outbound_last_success_unique_idx.sql b/synapse/storage/data_stores/main/schema/delta/58/04device_lists_outbound_last_success_unique_idx.sql new file mode 100644 index 0000000000..d5e6deb878 --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/58/04device_lists_outbound_last_success_unique_idx.sql @@ -0,0 +1,28 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- register a background update which will create a unique index on +-- device_lists_outbound_last_success +INSERT into background_updates (ordering, update_name, progress_json) + VALUES (5804, 'device_lists_outbound_last_success_unique_idx', '{}'); + +-- once that completes, we can drop the old index. +INSERT into background_updates (ordering, update_name, progress_json, depends_on) + VALUES ( + 5804, + 'drop_device_lists_outbound_last_success_non_unique_idx', + '{}', + 'device_lists_outbound_last_success_unique_idx' + ); diff --git a/synapse/storage/database.py b/synapse/storage/database.py index a7cd97b0b0..2b635d6ca0 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -49,6 +49,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.background_updates import BackgroundUpdater from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.types import Connection, Cursor +from synapse.types import Collection from synapse.util.stringutils import exception_to_unicode logger = logging.getLogger(__name__) @@ -78,6 +79,7 @@ UNIQUE_INDEX_BACKGROUND_UPDATES = { "device_lists_remote_extremeties": "device_lists_remote_extremeties_unique_idx", "device_lists_remote_cache": "device_lists_remote_cache_unique_idx", "event_search": "event_search_event_id_idx", + "device_lists_outbound_last_success": "device_lists_outbound_last_success_unique_idx", } @@ -889,20 +891,24 @@ class Database(object): txn.execute(sql, list(allvalues.values())) def simple_upsert_many_txn( - self, txn, table, key_names, key_values, value_names, value_values - ): + self, + txn: LoggingTransaction, + table: str, + key_names: Collection[str], + key_values: Collection[Iterable[Any]], + value_names: Collection[str], + value_values: Iterable[Iterable[str]], + ) -> None: """ Upsert, many times. Args: - table (str): The table to upsert into - key_names (list[str]): The key column names. - key_values (list[list]): A list of each row's key column values. - value_names (list[str]): The value column names. If empty, no - values will be used, even if value_values is provided. - value_values (list[list]): A list of each row's value column values. - Returns: - None + table: The table to upsert into + key_names: The key column names. + key_values: A list of each row's key column values. + value_names: The value column names + value_values: A list of each row's value column values. + Ignored if value_names is empty. """ if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables: return self.simple_upsert_many_txn_native_upsert( @@ -914,20 +920,24 @@ class Database(object): ) def simple_upsert_many_txn_emulated( - self, txn, table, key_names, key_values, value_names, value_values - ): + self, + txn: LoggingTransaction, + table: str, + key_names: Iterable[str], + key_values: Collection[Iterable[Any]], + value_names: Collection[str], + value_values: Iterable[Iterable[str]], + ) -> None: """ Upsert, many times, but without native UPSERT support or batching. Args: - table (str): The table to upsert into - key_names (list[str]): The key column names. - key_values (list[list]): A list of each row's key column values. - value_names (list[str]): The value column names. If empty, no - values will be used, even if value_values is provided. - value_values (list[list]): A list of each row's value column values. - Returns: - None + table: The table to upsert into + key_names: The key column names. + key_values: A list of each row's key column values. + value_names: The value column names + value_values: A list of each row's value column values. + Ignored if value_names is empty. """ # No value columns, therefore make a blank list so that the following # zip() works correctly. @@ -941,20 +951,24 @@ class Database(object): self.simple_upsert_txn_emulated(txn, table, _keys, _vals) def simple_upsert_many_txn_native_upsert( - self, txn, table, key_names, key_values, value_names, value_values - ): + self, + txn: LoggingTransaction, + table: str, + key_names: Collection[str], + key_values: Collection[Iterable[Any]], + value_names: Collection[str], + value_values: Iterable[Iterable[Any]], + ) -> None: """ Upsert, many times, using batching where possible. Args: - table (str): The table to upsert into - key_names (list[str]): The key column names. - key_values (list[list]): A list of each row's key column values. - value_names (list[str]): The value column names. If empty, no - values will be used, even if value_values is provided. - value_values (list[list]): A list of each row's value column values. - Returns: - None + table: The table to upsert into + key_names: The key column names. + key_values: A list of each row's key column values. + value_names: The value column names + value_values: A list of each row's value column values. + Ignored if value_names is empty. """ allnames = [] # type: List[str] allnames.extend(key_names) diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 9d851beaa5..86d04ea9ac 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -16,6 +16,11 @@ import contextlib import threading from collections import deque +from typing import Dict, Set, Tuple + +from typing_extensions import Deque + +from synapse.storage.database import Database, LoggingTransaction class IdGenerator(object): @@ -87,7 +92,7 @@ class StreamIdGenerator(object): self._current = (max if step > 0 else min)( self._current, _load_current_id(db_conn, table, column, step) ) - self._unfinished_ids = deque() + self._unfinished_ids = deque() # type: Deque[int] def get_next(self): """ @@ -163,7 +168,7 @@ class ChainedIdGenerator(object): self.chained_generator = chained_generator self._lock = threading.Lock() self._current_max = _load_current_id(db_conn, table, column) - self._unfinished_ids = deque() + self._unfinished_ids = deque() # type: Deque[Tuple[int, int]] def get_next(self): """ @@ -198,3 +203,163 @@ class ChainedIdGenerator(object): return stream_id - 1, chained_id return self._current_max, self.chained_generator.get_current_token() + + +class MultiWriterIdGenerator: + """An ID generator that tracks a stream that can have multiple writers. + + Uses a Postgres sequence to coordinate ID assignment, but positions of other + writers will only get updated when `advance` is called (by replication). + + Note: Only works with Postgres. + + Args: + db_conn + db + instance_name: The name of this instance. + table: Database table associated with stream. + instance_column: Column that stores the row's writer's instance name + id_column: Column that stores the stream ID. + sequence_name: The name of the postgres sequence used to generate new + IDs. + """ + + def __init__( + self, + db_conn, + db: Database, + instance_name: str, + table: str, + instance_column: str, + id_column: str, + sequence_name: str, + ): + self._db = db + self._instance_name = instance_name + self._sequence_name = sequence_name + + # We lock as some functions may be called from DB threads. + self._lock = threading.Lock() + + self._current_positions = self._load_current_ids( + db_conn, table, instance_column, id_column + ) + + # Set of local IDs that we're still processing. The current position + # should be less than the minimum of this set (if not empty). + self._unfinished_ids = set() # type: Set[int] + + def _load_current_ids( + self, db_conn, table: str, instance_column: str, id_column: str + ) -> Dict[str, int]: + sql = """ + SELECT %(instance)s, MAX(%(id)s) FROM %(table)s + GROUP BY %(instance)s + """ % { + "instance": instance_column, + "id": id_column, + "table": table, + } + + cur = db_conn.cursor() + cur.execute(sql) + + # `cur` is an iterable over returned rows, which are 2-tuples. + current_positions = dict(cur) + + cur.close() + + return current_positions + + def _load_next_id_txn(self, txn): + txn.execute("SELECT nextval(?)", (self._sequence_name,)) + (next_id,) = txn.fetchone() + return next_id + + async def get_next(self): + """ + Usage: + with await stream_id_gen.get_next() as stream_id: + # ... persist event ... + """ + next_id = await self._db.runInteraction("_load_next_id", self._load_next_id_txn) + + # Assert the fetched ID is actually greater than what we currently + # believe the ID to be. If not, then the sequence and table have got + # out of sync somehow. + assert self.get_current_token() < next_id + + with self._lock: + self._unfinished_ids.add(next_id) + + @contextlib.contextmanager + def manager(): + try: + yield next_id + finally: + self._mark_id_as_finished(next_id) + + return manager() + + def get_next_txn(self, txn: LoggingTransaction): + """ + Usage: + + stream_id = stream_id_gen.get_next(txn) + # ... persist event ... + """ + + next_id = self._load_next_id_txn(txn) + + with self._lock: + self._unfinished_ids.add(next_id) + + txn.call_after(self._mark_id_as_finished, next_id) + txn.call_on_exception(self._mark_id_as_finished, next_id) + + return next_id + + def _mark_id_as_finished(self, next_id: int): + """The ID has finished being processed so we should advance the + current poistion if possible. + """ + + with self._lock: + self._unfinished_ids.discard(next_id) + + # Figure out if its safe to advance the position by checking there + # aren't any lower allocated IDs that are yet to finish. + if all(c > next_id for c in self._unfinished_ids): + curr = self._current_positions.get(self._instance_name, 0) + self._current_positions[self._instance_name] = max(curr, next_id) + + def get_current_token(self, instance_name: str = None) -> int: + """Gets the current position of a named writer (defaults to current + instance). + + Returns 0 if we don't have a position for the named writer (likely due + to it being a new writer). + """ + + if instance_name is None: + instance_name = self._instance_name + + with self._lock: + return self._current_positions.get(instance_name, 0) + + def get_positions(self) -> Dict[str, int]: + """Get a copy of the current positon map. + """ + + with self._lock: + return dict(self._current_positions) + + def advance(self, instance_name: str, new_id: int): + """Advance the postion of the named writer to the given ID, if greater + than existing entry. + """ + + with self._lock: + self._current_positions[instance_name] = max( + new_id, self._current_positions.get(instance_name, 0) + ) |