diff options
Diffstat (limited to 'synapse/storage')
38 files changed, 728 insertions, 436 deletions
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 142787fdfd..82b31d24f1 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -92,14 +92,12 @@ class BackgroundUpdater: self.db_pool = database # if a background update is currently running, its name. - self._current_background_update = None # type: Optional[str] - - self._background_update_performance = ( - {} - ) # type: Dict[str, BackgroundUpdatePerformance] - self._background_update_handlers = ( - {} - ) # type: Dict[str, Callable[[JsonDict, int], Awaitable[int]]] + self._current_background_update: Optional[str] = None + + self._background_update_performance: Dict[str, BackgroundUpdatePerformance] = {} + self._background_update_handlers: Dict[ + str, Callable[[JsonDict, int], Awaitable[int]] + ] = {} self._all_done = False def start_doing_background_updates(self) -> None: @@ -411,7 +409,7 @@ class BackgroundUpdater: c.execute(sql) if isinstance(self.db_pool.engine, engines.PostgresEngine): - runner = create_index_psql # type: Optional[Callable[[Connection], None]] + runner: Optional[Callable[[Connection], None]] = create_index_psql elif psql_only: runner = None else: diff --git a/synapse/storage/database.py b/synapse/storage/database.py index d470cdacde..ccf9ac51ef 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -111,7 +111,7 @@ def make_conn( db_config: DatabaseConnectionConfig, engine: BaseDatabaseEngine, default_txn_name: str, -) -> Connection: +) -> "LoggingDatabaseConnection": """Make a new connection to the database and return it. Returns: @@ -670,8 +670,8 @@ class DatabasePool: Returns: The result of func """ - after_callbacks = [] # type: List[_CallbackListEntry] - exception_callbacks = [] # type: List[_CallbackListEntry] + after_callbacks: List[_CallbackListEntry] = [] + exception_callbacks: List[_CallbackListEntry] = [] if not current_context(): logger.warning("Starting db txn '%s' from sentinel context", desc) @@ -907,7 +907,7 @@ class DatabasePool: # The sort is to ensure that we don't rely on dictionary iteration # order. keys, vals = zip( - *[zip(*(sorted(i.items(), key=lambda kv: kv[0]))) for i in values if i] + *(zip(*(sorted(i.items(), key=lambda kv: kv[0]))) for i in values if i) ) for k in keys: @@ -1090,7 +1090,7 @@ class DatabasePool: return False # We didn't find any existing rows, so insert a new one - allvalues = {} # type: Dict[str, Any] + allvalues: Dict[str, Any] = {} allvalues.update(keyvalues) allvalues.update(values) allvalues.update(insertion_values) @@ -1121,7 +1121,7 @@ class DatabasePool: values: The nonunique columns and their new values insertion_values: additional key/values to use only when inserting """ - allvalues = {} # type: Dict[str, Any] + allvalues: Dict[str, Any] = {} allvalues.update(keyvalues) allvalues.update(insertion_values or {}) @@ -1257,7 +1257,7 @@ class DatabasePool: value_values: A list of each row's value column values. Ignored if value_names is empty. """ - allnames = [] # type: List[str] + allnames: List[str] = [] allnames.extend(key_names) allnames.extend(value_names) @@ -1566,7 +1566,7 @@ class DatabasePool: """ keyvalues = keyvalues or {} - results = [] # type: List[Dict[str, Any]] + results: List[Dict[str, Any]] = [] if not iterable: return results @@ -1978,7 +1978,7 @@ class DatabasePool: raise ValueError("order_direction must be one of 'ASC' or 'DESC'.") where_clause = "WHERE " if filters or keyvalues or exclude_keyvalues else "" - arg_list = [] # type: List[Any] + arg_list: List[Any] = [] if filters: where_clause += " AND ".join("%s LIKE ?" % (k,) for k in filters) arg_list += list(filters.values()) diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 9f182c2a89..e2d1b758bd 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -48,9 +48,7 @@ def _make_exclusive_regex( ] if exclusive_user_regexes: exclusive_user_regex = "|".join("(" + r + ")" for r in exclusive_user_regexes) - exclusive_user_pattern = re.compile( - exclusive_user_regex - ) # type: Optional[Pattern] + exclusive_user_pattern: Optional[Pattern] = re.compile(exclusive_user_regex) else: # We handle this case specially otherwise the constructed regex # will always match diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 50e7ddd735..c55508867d 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -203,9 +203,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): "delete_messages_for_device", delete_messages_for_device_txn ) - log_kv( - {"message": "deleted {} messages for device".format(count), "count": count} - ) + log_kv({"message": f"deleted {count} messages for device", "count": count}) # Update the cache, ensuring that we only ever increase the value last_deleted_stream_id = self._last_device_delete_cache.get( diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index 0e3dd4e9ca..78ae68ec68 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -247,7 +247,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore): txn.execute(sql, query_params) - result = {} # type: Dict[str, Dict[str, Optional[DeviceKeyLookupResult]]] + result: Dict[str, Dict[str, Optional[DeviceKeyLookupResult]]] = {} for (user_id, device_id, display_name, key_json) in txn: if include_deleted_devices: deleted_devices.remove((user_id, device_id)) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index f23f8c6ecf..d39368c20e 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -16,6 +16,8 @@ import logging from queue import Empty, PriorityQueue from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple +from prometheus_client import Gauge + from synapse.api.constants import MAX_DEPTH from synapse.api.errors import StoreError from synapse.api.room_versions import RoomVersion @@ -32,6 +34,16 @@ from synapse.util.caches.descriptors import cached from synapse.util.caches.lrucache import LruCache from synapse.util.iterutils import batch_iter +oldest_pdu_in_federation_staging = Gauge( + "synapse_federation_server_oldest_inbound_pdu_in_staging", + "The age in seconds since we received the oldest pdu in the federation staging area", +) + +number_pdus_in_federation_queue = Gauge( + "synapse_federation_server_number_inbound_pdu_in_staging", + "The total number of events in the inbound federation staging", +) + logger = logging.getLogger(__name__) @@ -50,9 +62,11 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas ) # Cache of event ID to list of auth event IDs and their depths. - self._event_auth_cache = LruCache( + self._event_auth_cache: LruCache[str, List[Tuple[str, int]]] = LruCache( 500000, "_event_auth_cache", size_callback=len - ) # type: LruCache[str, List[Tuple[str, int]]] + ) + + self._clock.looping_call(self._get_stats_for_federation_staging, 30 * 1000) async def get_auth_chain( self, room_id: str, event_ids: Collection[str], include_given: bool = False @@ -123,10 +137,10 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas initial_events = set(event_ids) # All the events that we've found that are reachable from the events. - seen_events = set() # type: Set[str] + seen_events: Set[str] = set() # A map from chain ID to max sequence number of the given events. - event_chains = {} # type: Dict[int, int] + event_chains: Dict[int, int] = {} sql = """ SELECT event_id, chain_id, sequence_number @@ -168,7 +182,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas """ # A map from chain ID to max sequence number *reachable* from any event ID. - chains = {} # type: Dict[int, int] + chains: Dict[int, int] = {} # Add all linked chains reachable from initial set of chains. for batch in batch_iter(event_chains, 1000): @@ -339,14 +353,14 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas initial_events = set(state_sets[0]).union(*state_sets[1:]) # Map from event_id -> (chain ID, seq no) - chain_info = {} # type: Dict[str, Tuple[int, int]] + chain_info: Dict[str, Tuple[int, int]] = {} # Map from chain ID -> seq no -> event Id - chain_to_event = {} # type: Dict[int, Dict[int, str]] + chain_to_event: Dict[int, Dict[int, str]] = {} # All the chains that we've found that are reachable from the state # sets. - seen_chains = set() # type: Set[int] + seen_chains: Set[int] = set() sql = """ SELECT event_id, chain_id, sequence_number @@ -378,9 +392,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas # Corresponds to `state_sets`, except as a map from chain ID to max # sequence number reachable from the state set. - set_to_chain = [] # type: List[Dict[int, int]] + set_to_chain: List[Dict[int, int]] = [] for state_set in state_sets: - chains = {} # type: Dict[int, int] + chains: Dict[int, int] = {} set_to_chain.append(chains) for event_id in state_set: @@ -432,7 +446,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas # Mapping from chain ID to the range of sequence numbers that should be # pulled from the database. - chain_to_gap = {} # type: Dict[int, Tuple[int, int]] + chain_to_gap: Dict[int, Tuple[int, int]] = {} for chain_id in seen_chains: min_seq_no = min(chains.get(chain_id, 0) for chains in set_to_chain) @@ -541,7 +555,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas } # The sorted list of events whose auth chains we should walk. - search = [] # type: List[Tuple[int, str]] + search: List[Tuple[int, str]] = [] # We need to get the depth of the initial events for sorting purposes. sql = """ @@ -564,7 +578,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas search.sort() # Map from event to its auth events - event_to_auth_events = {} # type: Dict[str, Set[str]] + event_to_auth_events: Dict[str, Set[str]] = {} base_sql = """ SELECT a.event_id, auth_id, depth @@ -1075,16 +1089,62 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas self, origin: str, event_id: str, - ) -> None: - """Remove the given event from the staging area""" - await self.db_pool.simple_delete( - table="federation_inbound_events_staging", - keyvalues={ - "origin": origin, - "event_id": event_id, - }, - desc="remove_received_event_from_staging", - ) + ) -> Optional[int]: + """Remove the given event from the staging area. + + Returns: + The received_ts of the row that was deleted, if any. + """ + if self.db_pool.engine.supports_returning: + + def _remove_received_event_from_staging_txn(txn): + sql = """ + DELETE FROM federation_inbound_events_staging + WHERE origin = ? AND event_id = ? + RETURNING received_ts + """ + + txn.execute(sql, (origin, event_id)) + return txn.fetchone() + + row = await self.db_pool.runInteraction( + "remove_received_event_from_staging", + _remove_received_event_from_staging_txn, + db_autocommit=True, + ) + if row is None: + return None + + return row[0] + + else: + + def _remove_received_event_from_staging_txn(txn): + received_ts = self.db_pool.simple_select_one_onecol_txn( + txn, + table="federation_inbound_events_staging", + keyvalues={ + "origin": origin, + "event_id": event_id, + }, + retcol="received_ts", + allow_none=True, + ) + self.db_pool.simple_delete_txn( + txn, + table="federation_inbound_events_staging", + keyvalues={ + "origin": origin, + "event_id": event_id, + }, + ) + + return received_ts + + return await self.db_pool.runInteraction( + "remove_received_event_from_staging", + _remove_received_event_from_staging_txn, + ) async def get_next_staged_event_id_for_room( self, @@ -1147,6 +1207,42 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas return origin, event + async def get_all_rooms_with_staged_incoming_events(self) -> List[str]: + """Get the room IDs of all events currently staged.""" + return await self.db_pool.simple_select_onecol( + table="federation_inbound_events_staging", + keyvalues={}, + retcol="DISTINCT room_id", + desc="get_all_rooms_with_staged_incoming_events", + ) + + @wrap_as_background_process("_get_stats_for_federation_staging") + async def _get_stats_for_federation_staging(self): + """Update the prometheus metrics for the inbound federation staging area.""" + + def _get_stats_for_federation_staging_txn(txn): + txn.execute( + "SELECT coalesce(count(*), 0) FROM federation_inbound_events_staging" + ) + (count,) = txn.fetchone() + + txn.execute( + "SELECT coalesce(min(received_ts), 0) FROM federation_inbound_events_staging" + ) + + (received_ts,) = txn.fetchone() + + age = self._clock.time_msec() - received_ts + + return count, age + + count, age = await self.db_pool.runInteraction( + "_get_stats_for_federation_staging", _get_stats_for_federation_staging_txn + ) + + number_pdus_in_federation_queue.set(count) + oldest_pdu_in_federation_staging.set(age) + class EventFederationStore(EventFederationWorkerStore): """Responsible for storing and serving up the various graphs associated diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index d1237c65cc..55caa6bbe7 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -759,7 +759,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): # object because we might not have the same amount of rows in each of them. To do # this, we use a dict indexed on the user ID and room ID to make it easier to # populate. - summaries = {} # type: Dict[Tuple[str, str], _EventPushSummary] + summaries: Dict[Tuple[str, str], _EventPushSummary] = {} for row in txn: summaries[(row[0], row[1])] = _EventPushSummary( unread_count=row[2], diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 897fa06639..a396a201d4 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -109,10 +109,8 @@ class PersistEventsStore: # Ideally we'd move these ID gens here, unfortunately some other ID # generators are chained off them so doing so is a bit of a PITA. - self._backfill_id_gen = ( - self.store._backfill_id_gen - ) # type: MultiWriterIdGenerator - self._stream_id_gen = self.store._stream_id_gen # type: MultiWriterIdGenerator + self._backfill_id_gen: MultiWriterIdGenerator = self.store._backfill_id_gen + self._stream_id_gen: MultiWriterIdGenerator = self.store._stream_id_gen # This should only exist on instances that are configured to write assert ( @@ -221,7 +219,7 @@ class PersistEventsStore: Returns: Filtered event ids """ - results = [] # type: List[str] + results: List[str] = [] def _get_events_which_are_prevs_txn(txn, batch): sql = """ @@ -508,7 +506,7 @@ class PersistEventsStore: """ # Map from event ID to chain ID/sequence number. - chain_map = {} # type: Dict[str, Tuple[int, int]] + chain_map: Dict[str, Tuple[int, int]] = {} # Set of event IDs to calculate chain ID/seq numbers for. events_to_calc_chain_id_for = set(event_to_room_id) @@ -817,8 +815,8 @@ class PersistEventsStore: # new chain if the sequence number has already been allocated. # - existing_chains = set() # type: Set[int] - tree = [] # type: List[Tuple[str, Optional[str]]] + existing_chains: Set[int] = set() + tree: List[Tuple[str, Optional[str]]] = [] # We need to do this in a topologically sorted order as we want to # generate chain IDs/sequence numbers of an event's auth events before @@ -848,7 +846,7 @@ class PersistEventsStore: ) txn.execute(sql % (clause,), args) - chain_to_max_seq_no = {row[0]: row[1] for row in txn} # type: Dict[Any, int] + chain_to_max_seq_no: Dict[Any, int] = {row[0]: row[1] for row in txn} # Allocate the new events chain ID/sequence numbers. # @@ -858,8 +856,8 @@ class PersistEventsStore: # number of new chain IDs in one call, replacing all temporary # objects with real allocated chain IDs. - unallocated_chain_ids = set() # type: Set[object] - new_chain_tuples = {} # type: Dict[str, Tuple[Any, int]] + unallocated_chain_ids: Set[object] = set() + new_chain_tuples: Dict[str, Tuple[Any, int]] = {} for event_id, auth_event_id in tree: # If we reference an auth_event_id we fetch the allocated chain ID, # either from the existing `chain_map` or the newly generated @@ -870,7 +868,7 @@ class PersistEventsStore: if not existing_chain_id: existing_chain_id = chain_map[auth_event_id] - new_chain_tuple = None # type: Optional[Tuple[Any, int]] + new_chain_tuple: Optional[Tuple[Any, int]] = None if existing_chain_id: # We found a chain ID/sequence number candidate, check its # not already taken. @@ -897,9 +895,9 @@ class PersistEventsStore: ) # Map from potentially temporary chain ID to real chain ID - chain_id_to_allocated_map = dict( + chain_id_to_allocated_map: Dict[Any, int] = dict( zip(unallocated_chain_ids, newly_allocated_chain_ids) - ) # type: Dict[Any, int] + ) chain_id_to_allocated_map.update((c, c) for c in existing_chains) return { @@ -1175,9 +1173,9 @@ class PersistEventsStore: Returns: list[(EventBase, EventContext)]: filtered list """ - new_events_and_contexts = ( - OrderedDict() - ) # type: OrderedDict[str, Tuple[EventBase, EventContext]] + new_events_and_contexts: OrderedDict[ + str, Tuple[EventBase, EventContext] + ] = OrderedDict() for event, context in events_and_contexts: prev_event_context = new_events_and_contexts.get(event.event_id) if prev_event_context: @@ -1205,7 +1203,7 @@ class PersistEventsStore: we are persisting backfilled (bool): True if the events were backfilled """ - depth_updates = {} # type: Dict[str, int] + depth_updates: Dict[str, int] = {} for event, context in events_and_contexts: # Remove the any existing cache entries for the event_ids txn.call_after(self.store._invalidate_get_event_cache, event.event_id) @@ -1580,11 +1578,11 @@ class PersistEventsStore: # invalidate the cache for the redacted event txn.call_after(self.store._invalidate_get_event_cache, event.redacts) - self.db_pool.simple_insert_txn( + self.db_pool.simple_upsert_txn( txn, table="redactions", + keyvalues={"event_id": event.event_id}, values={ - "event_id": event.event_id, "redacts": event.redacts, "received_ts": self._clock.time_msec(), }, @@ -1885,7 +1883,7 @@ class PersistEventsStore: ), ) - room_to_event_ids = {} # type: Dict[str, List[str]] + room_to_event_ids: Dict[str, List[str]] = {} for e, _ in events_and_contexts: room_to_event_ids.setdefault(e.room_id, []).append(e.event_id) @@ -2012,10 +2010,6 @@ class PersistEventsStore: Forward extremities are handled when we first start persisting the events. """ - events_by_room = {} # type: Dict[str, List[EventBase]] - for ev in events: - events_by_room.setdefault(ev.room_id, []).append(ev) - query = ( "INSERT INTO event_backward_extremities (event_id, room_id)" " SELECT ?, ? WHERE NOT EXISTS (" diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index da3a7df27b..6fcb2b8353 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -29,13 +29,18 @@ from synapse.types import JsonDict logger = logging.getLogger(__name__) -_REPLACE_STREAM_ORDRING_SQL_COMMANDS = ( +_REPLACE_STREAM_ORDERING_SQL_COMMANDS = ( # there should be no leftover rows without a stream_ordering2, but just in case... "UPDATE events SET stream_ordering2 = stream_ordering WHERE stream_ordering2 IS NULL", - # finally, we can drop the rule and switch the columns + # now we can drop the rule and switch the columns "DROP RULE populate_stream_ordering2 ON events", "ALTER TABLE events DROP COLUMN stream_ordering", "ALTER TABLE events RENAME COLUMN stream_ordering2 TO stream_ordering", + # ... and finally, rename the indexes into place for consistency with sqlite + "ALTER INDEX event_contains_url_index2 RENAME TO event_contains_url_index", + "ALTER INDEX events_order_room2 RENAME TO events_order_room", + "ALTER INDEX events_room_stream2 RENAME TO events_room_stream", + "ALTER INDEX events_ts2 RENAME TO events_ts", ) @@ -45,6 +50,10 @@ class _BackgroundUpdates: DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities" POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2" INDEX_STREAM_ORDERING2 = "index_stream_ordering2" + INDEX_STREAM_ORDERING2_CONTAINS_URL = "index_stream_ordering2_contains_url" + INDEX_STREAM_ORDERING2_ROOM_ORDER = "index_stream_ordering2_room_order" + INDEX_STREAM_ORDERING2_ROOM_STREAM = "index_stream_ordering2_room_stream" + INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts" REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column" @@ -155,12 +164,16 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): self._purged_chain_cover_index, ) + ################################################################################ + # bg updates for replacing stream_ordering with a BIGINT # (these only run on postgres.) + self.db_pool.updates.register_background_update_handler( _BackgroundUpdates.POPULATE_STREAM_ORDERING2, self._background_populate_stream_ordering2, ) + # CREATE UNIQUE INDEX events_stream_ordering ON events(stream_ordering2); self.db_pool.updates.register_background_index_update( _BackgroundUpdates.INDEX_STREAM_ORDERING2, index_name="events_stream_ordering", @@ -168,11 +181,42 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): columns=["stream_ordering2"], unique=True, ) + # CREATE INDEX event_contains_url_index ON events(room_id, topological_ordering, stream_ordering) WHERE contains_url = true AND outlier = false; + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.INDEX_STREAM_ORDERING2_CONTAINS_URL, + index_name="event_contains_url_index2", + table="events", + columns=["room_id", "topological_ordering", "stream_ordering2"], + where_clause="contains_url = true AND outlier = false", + ) + # CREATE INDEX events_order_room ON events(room_id, topological_ordering, stream_ordering); + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.INDEX_STREAM_ORDERING2_ROOM_ORDER, + index_name="events_order_room2", + table="events", + columns=["room_id", "topological_ordering", "stream_ordering2"], + ) + # CREATE INDEX events_room_stream ON events(room_id, stream_ordering); + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.INDEX_STREAM_ORDERING2_ROOM_STREAM, + index_name="events_room_stream2", + table="events", + columns=["room_id", "stream_ordering2"], + ) + # CREATE INDEX events_ts ON events(origin_server_ts, stream_ordering); + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.INDEX_STREAM_ORDERING2_TS, + index_name="events_ts2", + table="events", + columns=["origin_server_ts", "stream_ordering2"], + ) self.db_pool.updates.register_background_update_handler( _BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN, self._background_replace_stream_ordering_column, ) + ################################################################################ + async def _background_reindex_fields_sender(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] @@ -916,9 +960,9 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): event_to_types = {row[0]: (row[1], row[2]) for row in rows} # Calculate the new last position we've processed up to. - new_last_depth = rows[-1][3] if rows else last_depth # type: int - new_last_stream = rows[-1][4] if rows else last_stream # type: int - new_last_room_id = rows[-1][5] if rows else "" # type: str + new_last_depth: int = rows[-1][3] if rows else last_depth + new_last_stream: int = rows[-1][4] if rows else last_stream + new_last_room_id: str = rows[-1][5] if rows else "" # Map from room_id to last depth/stream_ordering processed for the room, # excluding the last room (which we're likely still processing). We also @@ -945,7 +989,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): retcols=("event_id", "auth_id"), ) - event_to_auth_chain = {} # type: Dict[str, List[str]] + event_to_auth_chain: Dict[str, List[str]] = {} for row in auth_events: event_to_auth_chain.setdefault(row["event_id"], []).append(row["auth_id"]) @@ -1098,10 +1142,20 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): """Drop the old 'stream_ordering' column and rename 'stream_ordering2' into its place.""" def process(txn: Cursor) -> None: - for sql in _REPLACE_STREAM_ORDRING_SQL_COMMANDS: + for sql in _REPLACE_STREAM_ORDERING_SQL_COMMANDS: logger.info("completing stream_ordering migration: %s", sql) txn.execute(sql) + # ANALYZE the new column to build stats on it, to encourage PostgreSQL to use the + # indexes on it. + # We need to pass execute a dummy function to handle the txn's result otherwise + # it tries to call fetchall() on it and fails because there's no result to fetch. + await self.db_pool.execute( + "background_analyze_new_stream_ordering_column", + lambda txn: None, + "ANALYZE events(stream_ordering2)", + ) + await self.db_pool.runInteraction( "_background_replace_stream_ordering_column", process ) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 86fd79f3a6..5ff29530bb 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1468,10 +1468,10 @@ class EventsWorkerStore(SQLBaseStore): # we need to make sure that, for every stream id in the results, we get *all* # the rows with that stream id. - rows = await self.db_pool.runInteraction( + rows: List[Tuple] = await self.db_pool.runInteraction( "get_all_updated_current_state_deltas", get_all_updated_current_state_deltas_txn, - ) # type: List[Tuple] + ) # if we've got fewer rows than the limit, we're good if len(rows) < target_row_count: @@ -1572,7 +1572,7 @@ class EventsWorkerStore(SQLBaseStore): """ mapping = {} - txn_id_to_event = {} # type: Dict[Tuple[str, int, str], str] + txn_id_to_event: Dict[Tuple[str, int, str], str] = {} for event in events: token_id = getattr(event.internal_metadata, "token_id", None) diff --git a/synapse/storage/databases/main/group_server.py b/synapse/storage/databases/main/group_server.py index 66ad363bfb..e70d3649ff 100644 --- a/synapse/storage/databases/main/group_server.py +++ b/synapse/storage/databases/main/group_server.py @@ -27,8 +27,11 @@ from synapse.util import json_encoder _DEFAULT_CATEGORY_ID = "" _DEFAULT_ROLE_ID = "" + # A room in a group. -_RoomInGroup = TypedDict("_RoomInGroup", {"room_id": str, "is_public": bool}) +class _RoomInGroup(TypedDict): + room_id: str + is_public: bool class GroupServerWorkerStore(SQLBaseStore): @@ -92,6 +95,7 @@ class GroupServerWorkerStore(SQLBaseStore): "is_public": False # Whether this is a public room or not } """ + # TODO: Pagination def _get_rooms_in_group_txn(txn): diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py index e76188328c..774861074c 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py @@ -310,14 +310,25 @@ class Lock: _excinst: Optional[BaseException], _exctb: Optional[TracebackType], ) -> bool: + await self.release() + + return False + + async def release(self) -> None: + """Release the lock. + + This is automatically called when using the lock as a context manager. + """ + + if self._dropped: + return + if self._looping_call.running: self._looping_call.stop() await self._store._drop_lock(self._lock_name, self._lock_key, self._token) self._dropped = True - return False - def __del__(self) -> None: if not self._dropped: # We should not be dropped without the lock being released (unless diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py index c3f551d377..dc0bbc56ac 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py @@ -316,11 +316,140 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): return await self.db_pool.runInteraction("count_r30_users", _count_r30_users) + async def count_r30v2_users(self) -> Dict[str, int]: + """ + Counts the number of 30 day retained users, defined as users that: + - Appear more than once in the past 60 days + - Have more than 30 days between the most and least recent appearances that + occurred in the past 60 days. + + (This is the second version of this metric, hence R30'v2') + + Returns: + A mapping from client type to the number of 30-day retained users for that client. + + The dict keys are: + - "all" (a combined number of users across any and all clients) + - "android" (Element Android) + - "ios" (Element iOS) + - "electron" (Element Desktop) + - "web" (any web application -- it's not possible to distinguish Element Web here) + """ + + def _count_r30v2_users(txn): + thirty_days_in_secs = 86400 * 30 + now = int(self._clock.time()) + sixty_days_ago_in_secs = now - 2 * thirty_days_in_secs + one_day_from_now_in_secs = now + 86400 + + # This is the 'per-platform' count. + sql = """ + SELECT + client_type, + count(client_type) + FROM + ( + SELECT + user_id, + CASE + WHEN + LOWER(user_agent) LIKE '%%riot%%' OR + LOWER(user_agent) LIKE '%%element%%' + THEN CASE + WHEN + LOWER(user_agent) LIKE '%%electron%%' + THEN 'electron' + WHEN + LOWER(user_agent) LIKE '%%android%%' + THEN 'android' + WHEN + LOWER(user_agent) LIKE '%%ios%%' + THEN 'ios' + ELSE 'unknown' + END + WHEN + LOWER(user_agent) LIKE '%%mozilla%%' OR + LOWER(user_agent) LIKE '%%gecko%%' + THEN 'web' + ELSE 'unknown' + END as client_type + FROM + user_daily_visits + WHERE + timestamp > ? + AND + timestamp < ? + GROUP BY + user_id, + client_type + HAVING + max(timestamp) - min(timestamp) > ? + ) AS temp + GROUP BY + client_type + ; + """ + + # We initialise all the client types to zero, so we get an explicit + # zero if they don't appear in the query results + results = {"ios": 0, "android": 0, "web": 0, "electron": 0} + txn.execute( + sql, + ( + sixty_days_ago_in_secs * 1000, + one_day_from_now_in_secs * 1000, + thirty_days_in_secs * 1000, + ), + ) + + for row in txn: + if row[0] == "unknown": + continue + results[row[0]] = row[1] + + # This is the 'all users' count. + sql = """ + SELECT COUNT(*) FROM ( + SELECT + 1 + FROM + user_daily_visits + WHERE + timestamp > ? + AND + timestamp < ? + GROUP BY + user_id + HAVING + max(timestamp) - min(timestamp) > ? + ) AS r30_users + """ + + txn.execute( + sql, + ( + sixty_days_ago_in_secs * 1000, + one_day_from_now_in_secs * 1000, + thirty_days_in_secs * 1000, + ), + ) + row = txn.fetchone() + if row is None: + results["all"] = 0 + else: + results["all"] = row[0] + + return results + + return await self.db_pool.runInteraction( + "count_r30v2_users", _count_r30v2_users + ) + def _get_start_of_day(self): """ Returns millisecond unixtime for start of UTC day. """ - now = time.gmtime() + now = time.gmtime(self._clock.time()) today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0)) return today_start * 1000 @@ -352,7 +481,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): ) udv ON u.user_id = udv.user_id AND u.device_id=udv.device_id INNER JOIN users ON users.name=u.user_id - WHERE last_seen > ? AND last_seen <= ? + WHERE ? <= last_seen AND last_seen < ? AND udv.timestamp IS NULL AND users.is_guest=0 AND users.appservice_id IS NULL GROUP BY u.user_id, u.device_id diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py index 9b4e95e134..ba7075caa5 100644 --- a/synapse/storage/databases/main/profile.py +++ b/synapse/storage/databases/main/profile.py @@ -73,20 +73,20 @@ class ProfileWorkerStore(SQLBaseStore): async def set_profile_displayname( self, user_localpart: str, new_displayname: Optional[str] ) -> None: - await self.db_pool.simple_update_one( + await self.db_pool.simple_upsert( table="profiles", keyvalues={"user_id": user_localpart}, - updatevalues={"displayname": new_displayname}, + values={"displayname": new_displayname}, desc="set_profile_displayname", ) async def set_profile_avatar_url( self, user_localpart: str, new_avatar_url: Optional[str] ) -> None: - await self.db_pool.simple_update_one( + await self.db_pool.simple_upsert( table="profiles", keyvalues={"user_id": user_localpart}, - updatevalues={"avatar_url": new_avatar_url}, + values={"avatar_url": new_avatar_url}, desc="set_profile_avatar_url", ) diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 7fb7780d0f..664c65dac5 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -115,7 +115,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): logger.info("[purge] looking for events to delete") should_delete_expr = "state_key IS NULL" - should_delete_params = () # type: Tuple[Any, ...] + should_delete_params: Tuple[Any, ...] = () if not delete_local_events: should_delete_expr += " AND event_id NOT LIKE ?" @@ -215,6 +215,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): "event_relations", "event_search", "rejections", + "redactions", ): logger.info("[purge] removing events from %s", table) @@ -392,7 +393,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): "room_memberships", "room_stats_state", "room_stats_current", - "room_stats_historical", "room_stats_earliest_token", "rooms", "stream_ordering_to_exterm", diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index db52176337..a7fb8cd848 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -79,9 +79,9 @@ class PushRulesWorkerStore( super().__init__(database, db_conn, hs) if hs.config.worker.worker_app is None: - self._push_rules_stream_id_gen = StreamIdGenerator( - db_conn, "push_rules_stream", "stream_id" - ) # type: Union[StreamIdGenerator, SlavedIdTracker] + self._push_rules_stream_id_gen: Union[ + StreamIdGenerator, SlavedIdTracker + ] = StreamIdGenerator(db_conn, "push_rules_stream", "stream_id") else: self._push_rules_stream_id_gen = SlavedIdTracker( db_conn, "push_rules_stream", "stream_id" diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index e31c5864ac..6ad1a0cf7f 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -1744,7 +1744,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): items = keyvalues.items() where_clause = " AND ".join(k + " = ?" for k, _ in items) - values = [v for _, v in items] # type: List[Union[str, int]] + values: List[Union[str, int]] = [v for _, v in items] # Conveniently, refresh_tokens and access_tokens both use the user_id and device_id fields. Only caveat # is the `except_token_id` param that is tricky to get right, so for now we're just using the same where # clause and values before we handle that. This seems to be only used in the "set password" handler. diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 9f0d64a325..6ddafe5434 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -25,6 +25,7 @@ from synapse.api.room_versions import RoomVersion, RoomVersions from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.databases.main.search import SearchStore +from synapse.storage.types import Cursor from synapse.types import JsonDict, ThirdPartyInstanceID from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -1022,10 +1023,22 @@ class RoomWorkerStore(SQLBaseStore): ) -class RoomBackgroundUpdateStore(SQLBaseStore): +class _BackgroundUpdates: REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory" ADD_ROOMS_ROOM_VERSION_COLUMN = "add_rooms_room_version_column" + POPULATE_ROOM_DEPTH_MIN_DEPTH2 = "populate_room_depth_min_depth2" + REPLACE_ROOM_DEPTH_MIN_DEPTH = "replace_room_depth_min_depth" + + +_REPLACE_ROOM_DEPTH_SQL_COMMANDS = ( + "DROP TRIGGER populate_min_depth2_trigger ON room_depth", + "DROP FUNCTION populate_min_depth2()", + "ALTER TABLE room_depth DROP COLUMN min_depth", + "ALTER TABLE room_depth RENAME COLUMN min_depth2 TO min_depth", +) + +class RoomBackgroundUpdateStore(SQLBaseStore): def __init__(self, database: DatabasePool, db_conn, hs): super().__init__(database, db_conn, hs) @@ -1037,15 +1050,25 @@ class RoomBackgroundUpdateStore(SQLBaseStore): ) self.db_pool.updates.register_background_update_handler( - self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, + _BackgroundUpdates.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, self._remove_tombstoned_rooms_from_directory, ) self.db_pool.updates.register_background_update_handler( - self.ADD_ROOMS_ROOM_VERSION_COLUMN, + _BackgroundUpdates.ADD_ROOMS_ROOM_VERSION_COLUMN, self._background_add_rooms_room_version_column, ) + # BG updates to change the type of room_depth.min_depth + self.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2, + self._background_populate_room_depth_min_depth2, + ) + self.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.REPLACE_ROOM_DEPTH_MIN_DEPTH, + self._background_replace_room_depth_min_depth, + ) + async def _background_insert_retention(self, progress, batch_size): """Retrieves a list of all rooms within a range and inserts an entry for each of them into the room_retention table. @@ -1164,7 +1187,9 @@ class RoomBackgroundUpdateStore(SQLBaseStore): new_last_room_id = room_id self.db_pool.updates._background_update_progress_txn( - txn, self.ADD_ROOMS_ROOM_VERSION_COLUMN, {"room_id": new_last_room_id} + txn, + _BackgroundUpdates.ADD_ROOMS_ROOM_VERSION_COLUMN, + {"room_id": new_last_room_id}, ) return False @@ -1176,7 +1201,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore): if end: await self.db_pool.updates._end_background_update( - self.ADD_ROOMS_ROOM_VERSION_COLUMN + _BackgroundUpdates.ADD_ROOMS_ROOM_VERSION_COLUMN ) return batch_size @@ -1215,7 +1240,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore): if not rooms: await self.db_pool.updates._end_background_update( - self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE + _BackgroundUpdates.REMOVE_TOMESTONED_ROOMS_BG_UPDATE ) return 0 @@ -1224,7 +1249,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore): await self.set_room_is_public(room_id, False) await self.db_pool.updates._background_update_progress( - self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, {"room_id": rooms[-1]} + _BackgroundUpdates.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, {"room_id": rooms[-1]} ) return len(rooms) @@ -1268,6 +1293,71 @@ class RoomBackgroundUpdateStore(SQLBaseStore): return max_ordering is None + async def _background_populate_room_depth_min_depth2( + self, progress: JsonDict, batch_size: int + ) -> int: + """Populate room_depth.min_depth2 + + This is to deal with the fact that min_depth was initially created as a + 32-bit integer field. + """ + + def process(txn: Cursor) -> int: + last_room = progress.get("last_room", "") + txn.execute( + """ + UPDATE room_depth SET min_depth2=min_depth + WHERE room_id IN ( + SELECT room_id FROM room_depth WHERE room_id > ? + ORDER BY room_id LIMIT ? + ) + RETURNING room_id; + """, + (last_room, batch_size), + ) + row_count = txn.rowcount + if row_count == 0: + return 0 + last_room = max(row[0] for row in txn) + logger.info("populated room_depth up to %s", last_room) + + self.db_pool.updates._background_update_progress_txn( + txn, + _BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2, + {"last_room": last_room}, + ) + return row_count + + result = await self.db_pool.runInteraction( + "_background_populate_min_depth2", process + ) + + if result != 0: + return result + + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2 + ) + return 0 + + async def _background_replace_room_depth_min_depth( + self, progress: JsonDict, batch_size: int + ) -> int: + """Drop the old 'min_depth' column and rename 'min_depth2' into its place.""" + + def process(txn: Cursor) -> None: + for sql in _REPLACE_ROOM_DEPTH_SQL_COMMANDS: + logger.info("completing room_depth migration: %s", sql) + txn.execute(sql) + + await self.db_pool.runInteraction("_background_replace_room_depth", process) + + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.REPLACE_ROOM_DEPTH_MIN_DEPTH, + ) + + return 0 + class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): def __init__(self, database: DatabasePool, db_conn, hs): diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 4eebe5257a..b6cb6836c2 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -649,7 +649,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): event_to_memberships = await self._get_joined_profiles_from_event_ids( missing_member_event_ids ) - users_in_room.update((row for row in event_to_memberships.values() if row)) + users_in_room.update(row for row in event_to_memberships.values() if row) if event is not None and event.type == EventTypes.Member: if event.membership == Membership.JOIN: @@ -703,13 +703,22 @@ class RoomMemberWorkerStore(EventsWorkerStore): @cached(max_entries=10000) async def is_host_joined(self, room_id: str, host: str) -> bool: + return await self._check_host_room_membership(room_id, host, Membership.JOIN) + + @cached(max_entries=10000) + async def is_host_invited(self, room_id: str, host: str) -> bool: + return await self._check_host_room_membership(room_id, host, Membership.INVITE) + + async def _check_host_room_membership( + self, room_id: str, host: str, membership: str + ) -> bool: if "%" in host or "_" in host: raise Exception("Invalid host name") sql = """ SELECT state_key FROM current_state_events AS c INNER JOIN room_memberships AS m USING (event_id) - WHERE m.membership = 'join' + WHERE m.membership = ? AND type = 'm.room.member' AND c.room_id = ? AND state_key LIKE ? @@ -722,7 +731,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): like_clause = "%:" + host rows = await self.db_pool.execute( - "is_host_joined", None, sql, room_id, like_clause + "is_host_joined", None, sql, membership, room_id, like_clause ) if not rows: diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py index 82a1833509..59d67c255b 100644 --- a/synapse/storage/databases/main/stats.py +++ b/synapse/storage/databases/main/stats.py @@ -26,7 +26,6 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.errors import StoreError from synapse.storage.database import DatabasePool from synapse.storage.databases.main.state_deltas import StateDeltasStore -from synapse.storage.engines import PostgresEngine from synapse.types import JsonDict from synapse.util.caches.descriptors import cached @@ -49,14 +48,6 @@ ABSOLUTE_STATS_FIELDS = { "user": ("joined_rooms",), } -# these fields are per-timeslice and so should be reset to 0 upon a new slice -# You can draw these stats on a histogram. -# Example: number of events sent locally during a time slice -PER_SLICE_FIELDS = { - "room": ("total_events", "total_event_bytes"), - "user": ("invites_sent", "rooms_created", "total_events", "total_event_bytes"), -} - TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} # these are the tables (& ID columns) which contain our actual subjects @@ -106,7 +97,6 @@ class StatsStore(StateDeltasStore): self.server_name = hs.hostname self.clock = self.hs.get_clock() self.stats_enabled = hs.config.stats_enabled - self.stats_bucket_size = hs.config.stats_bucket_size self.stats_delta_processing_lock = DeferredLock() @@ -122,22 +112,6 @@ class StatsStore(StateDeltasStore): self.db_pool.updates.register_noop_background_update("populate_stats_cleanup") self.db_pool.updates.register_noop_background_update("populate_stats_prepare") - def quantise_stats_time(self, ts): - """ - Quantises a timestamp to be a multiple of the bucket size. - - Args: - ts (int): the timestamp to quantise, in milliseconds since the Unix - Epoch - - Returns: - int: a timestamp which - - is divisible by the bucket size; - - is no later than `ts`; and - - is the largest such timestamp. - """ - return (ts // self.stats_bucket_size) * self.stats_bucket_size - async def _populate_stats_process_users(self, progress, batch_size): """ This is a background update which regenerates statistics for users. @@ -288,56 +262,6 @@ class StatsStore(StateDeltasStore): desc="update_room_state", ) - async def get_statistics_for_subject( - self, stats_type: str, stats_id: str, start: str, size: int = 100 - ) -> List[dict]: - """ - Get statistics for a given subject. - - Args: - stats_type: The type of subject - stats_id: The ID of the subject (e.g. room_id or user_id) - start: Pagination start. Number of entries, not timestamp. - size: How many entries to return. - - Returns: - A list of dicts, where the dict has the keys of - ABSOLUTE_STATS_FIELDS[stats_type], and "bucket_size" and "end_ts". - """ - return await self.db_pool.runInteraction( - "get_statistics_for_subject", - self._get_statistics_for_subject_txn, - stats_type, - stats_id, - start, - size, - ) - - def _get_statistics_for_subject_txn( - self, txn, stats_type, stats_id, start, size=100 - ): - """ - Transaction-bound version of L{get_statistics_for_subject}. - """ - - table, id_col = TYPE_TO_TABLE[stats_type] - selected_columns = list( - ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type] - ) - - slice_list = self.db_pool.simple_select_list_paginate_txn( - txn, - table + "_historical", - "end_ts", - start, - size, - retcols=selected_columns + ["bucket_size", "end_ts"], - keyvalues={id_col: stats_id}, - order_direction="DESC", - ) - - return slice_list - @cached() async def get_earliest_token_for_stats( self, stats_type: str, id: str @@ -451,14 +375,10 @@ class StatsStore(StateDeltasStore): table, id_col = TYPE_TO_TABLE[stats_type] - quantised_ts = self.quantise_stats_time(int(ts)) - end_ts = quantised_ts + self.stats_bucket_size - # Lets be paranoid and check that all the given field names are known abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type] - slice_field_names = PER_SLICE_FIELDS[stats_type] for field in chain(fields.keys(), absolute_field_overrides.keys()): - if field not in abs_field_names and field not in slice_field_names: + if field not in abs_field_names: # guard against potential SQL injection dodginess raise ValueError( "%s is not a recognised field" @@ -491,20 +411,6 @@ class StatsStore(StateDeltasStore): additive_relatives=deltas_of_absolute_fields, ) - per_slice_additive_relatives = { - key: fields.get(key, 0) for key in slice_field_names - } - self._upsert_copy_from_table_with_additive_relatives_txn( - txn=txn, - into_table=table + "_historical", - keyvalues={id_col: stats_id}, - extra_dst_insvalues={"bucket_size": self.stats_bucket_size}, - extra_dst_keyvalues={"end_ts": end_ts}, - additive_relatives=per_slice_additive_relatives, - src_table=table + "_current", - copy_columns=abs_field_names, - ) - def _upsert_with_additive_relatives_txn( self, txn, table, keyvalues, absolutes, additive_relatives ): @@ -528,7 +434,7 @@ class StatsStore(StateDeltasStore): ] relative_updates = [ - "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s" + "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)" % {"table": table, "field": field} for field in additive_relatives.keys() ] @@ -568,205 +474,13 @@ class StatsStore(StateDeltasStore): self.db_pool.simple_insert_txn(txn, table, merged_dict) else: for (key, val) in additive_relatives.items(): - current_row[key] += val + if current_row[key] is None: + current_row[key] = val + else: + current_row[key] += val current_row.update(absolutes) self.db_pool.simple_update_one_txn(txn, table, keyvalues, current_row) - def _upsert_copy_from_table_with_additive_relatives_txn( - self, - txn, - into_table, - keyvalues, - extra_dst_keyvalues, - extra_dst_insvalues, - additive_relatives, - src_table, - copy_columns, - ): - """Updates the historic stats table with latest updates. - - This involves copying "absolute" fields from the `_current` table, and - adding relative fields to any existing values. - - Args: - txn: Transaction - into_table (str): The destination table to UPSERT the row into - keyvalues (dict[str, any]): Row-identifying key values - extra_dst_keyvalues (dict[str, any]): Additional keyvalues - for `into_table`. - extra_dst_insvalues (dict[str, any]): Additional values to insert - on new row creation for `into_table`. - additive_relatives (dict[str, any]): Fields that will be added onto - if existing row present. (Must be disjoint from copy_columns.) - src_table (str): The source table to copy from - copy_columns (iterable[str]): The list of columns to copy - """ - if self.database_engine.can_native_upsert: - ins_columns = chain( - keyvalues, - copy_columns, - additive_relatives, - extra_dst_keyvalues, - extra_dst_insvalues, - ) - sel_exprs = chain( - keyvalues, - copy_columns, - ( - "?" - for _ in chain( - additive_relatives, extra_dst_keyvalues, extra_dst_insvalues - ) - ), - ) - keyvalues_where = ("%s = ?" % f for f in keyvalues) - - sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns) - sets_ar = ( - "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) - for f in additive_relatives - ) - - sql = """ - INSERT INTO %(into_table)s (%(ins_columns)s) - SELECT %(sel_exprs)s - FROM %(src_table)s - WHERE %(keyvalues_where)s - ON CONFLICT (%(keyvalues)s) - DO UPDATE SET %(sets)s - """ % { - "into_table": into_table, - "ins_columns": ", ".join(ins_columns), - "sel_exprs": ", ".join(sel_exprs), - "keyvalues_where": " AND ".join(keyvalues_where), - "src_table": src_table, - "keyvalues": ", ".join( - chain(keyvalues.keys(), extra_dst_keyvalues.keys()) - ), - "sets": ", ".join(chain(sets_cc, sets_ar)), - } - - qargs = list( - chain( - additive_relatives.values(), - extra_dst_keyvalues.values(), - extra_dst_insvalues.values(), - keyvalues.values(), - ) - ) - txn.execute(sql, qargs) - else: - self.database_engine.lock_table(txn, into_table) - src_row = self.db_pool.simple_select_one_txn( - txn, src_table, keyvalues, copy_columns - ) - all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues} - dest_current_row = self.db_pool.simple_select_one_txn( - txn, - into_table, - keyvalues=all_dest_keyvalues, - retcols=list(chain(additive_relatives.keys(), copy_columns)), - allow_none=True, - ) - - if dest_current_row is None: - merged_dict = { - **keyvalues, - **extra_dst_keyvalues, - **extra_dst_insvalues, - **src_row, - **additive_relatives, - } - self.db_pool.simple_insert_txn(txn, into_table, merged_dict) - else: - for (key, val) in additive_relatives.items(): - src_row[key] = dest_current_row[key] + val - self.db_pool.simple_update_txn( - txn, into_table, all_dest_keyvalues, src_row - ) - - async def get_changes_room_total_events_and_bytes( - self, min_pos: int, max_pos: int - ) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]: - """Fetches the counts of events in the given range of stream IDs. - - Args: - min_pos - max_pos - - Returns: - Mapping of room ID to field changes. - """ - - return await self.db_pool.runInteraction( - "stats_incremental_total_events_and_bytes", - self.get_changes_room_total_events_and_bytes_txn, - min_pos, - max_pos, - ) - - def get_changes_room_total_events_and_bytes_txn( - self, txn, low_pos: int, high_pos: int - ) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]: - """Gets the total_events and total_event_bytes counts for rooms and - senders, in a range of stream_orderings (including backfilled events). - - Args: - txn - low_pos: Low stream ordering - high_pos: High stream ordering - - Returns: - The room and user deltas for total_events/total_event_bytes in the - format of `stats_id` -> fields - """ - - if low_pos >= high_pos: - # nothing to do here. - return {}, {} - - if isinstance(self.database_engine, PostgresEngine): - new_bytes_expression = "OCTET_LENGTH(json)" - else: - new_bytes_expression = "LENGTH(CAST(json AS BLOB))" - - sql = """ - SELECT events.room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes - FROM events INNER JOIN event_json USING (event_id) - WHERE (? < stream_ordering AND stream_ordering <= ?) - OR (? <= stream_ordering AND stream_ordering <= ?) - GROUP BY events.room_id - """ % ( - new_bytes_expression, - ) - - txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos)) - - room_deltas = { - room_id: {"total_events": new_events, "total_event_bytes": new_bytes} - for room_id, new_events, new_bytes in txn - } - - sql = """ - SELECT events.sender, COUNT(*) AS new_events, SUM(%s) AS new_bytes - FROM events INNER JOIN event_json USING (event_id) - WHERE (? < stream_ordering AND stream_ordering <= ?) - OR (? <= stream_ordering AND stream_ordering <= ?) - GROUP BY events.sender - """ % ( - new_bytes_expression, - ) - - txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos)) - - user_deltas = { - user_id: {"total_events": new_events, "total_event_bytes": new_bytes} - for user_id, new_events, new_bytes in txn - if self.hs.is_mine_id(user_id) - } - - return room_deltas, user_deltas - async def _calculate_and_set_initial_state_for_room( self, room_id: str ) -> Tuple[dict, dict, int]: @@ -893,6 +607,7 @@ class StatsStore(StateDeltasStore): "invited_members": membership_counts.get(Membership.INVITE, 0), "left_members": membership_counts.get(Membership.LEAVE, 0), "banned_members": membership_counts.get(Membership.BAN, 0), + "knocked_members": membership_counts.get(Membership.KNOCK, 0), "local_users_in_room": len(local_users_in_room), }, ) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 7581c7d3ff..959f13de47 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -1085,9 +1085,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): # stream token (as returned by `RoomStreamToken.get_max_stream_pos`) and # then filtering the results. if from_token.topological is not None: - from_bound = ( - from_token.as_historical_tuple() - ) # type: Tuple[Optional[int], int] + from_bound: Tuple[Optional[int], int] = from_token.as_historical_tuple() elif direction == "b": from_bound = ( None, @@ -1099,7 +1097,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): from_token.stream, ) - to_bound = None # type: Optional[Tuple[Optional[int], int]] + to_bound: Optional[Tuple[Optional[int], int]] = None if to_token: if to_token.topological is not None: to_bound = to_token.as_historical_tuple() diff --git a/synapse/storage/databases/main/tags.py b/synapse/storage/databases/main/tags.py index 1d62c6140f..f93ff0a545 100644 --- a/synapse/storage/databases/main/tags.py +++ b/synapse/storage/databases/main/tags.py @@ -42,7 +42,7 @@ class TagsWorkerStore(AccountDataWorkerStore): "room_tags", {"user_id": user_id}, ["room_id", "tag", "content"] ) - tags_by_room = {} # type: Dict[str, Dict[str, JsonDict]] + tags_by_room: Dict[str, Dict[str, JsonDict]] = {} for row in rows: room_tags = tags_by_room.setdefault(row["room_id"], {}) room_tags[row["tag"]] = db_to_json(row["content"]) diff --git a/synapse/storage/databases/main/ui_auth.py b/synapse/storage/databases/main/ui_auth.py index 22c05cdde7..38bfdf5dad 100644 --- a/synapse/storage/databases/main/ui_auth.py +++ b/synapse/storage/databases/main/ui_auth.py @@ -224,12 +224,12 @@ class UIAuthWorkerStore(SQLBaseStore): self, txn: LoggingTransaction, session_id: str, key: str, value: Any ): # Get the current value. - result = self.db_pool.simple_select_one_txn( + result: Dict[str, Any] = self.db_pool.simple_select_one_txn( # type: ignore txn, table="ui_auth_sessions", keyvalues={"session_id": session_id}, retcols=("serverdict",), - ) # type: Dict[str, Any] # type: ignore + ) # Update it and add it back to the database. serverdict = db_to_json(result["serverdict"]) diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index 1882bfd9cf..20cd63c330 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -49,6 +49,12 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta): """ ... + @property + @abc.abstractmethod + def supports_returning(self) -> bool: + """Do we support the `RETURNING` clause in insert/update/delete?""" + ... + @abc.abstractmethod def check_database( self, db_conn: ConnectionType, allow_outdated_version: bool = False diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 21411c5fea..30f948a0f7 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -133,6 +133,11 @@ class PostgresEngine(BaseDatabaseEngine): """Do we support using `a = ANY(?)` and passing a list""" return True + @property + def supports_returning(self) -> bool: + """Do we support the `RETURNING` clause in insert/update/delete?""" + return True + def is_deadlock(self, error): if isinstance(error, self.module.DatabaseError): # https://www.postgresql.org/docs/current/static/errcodes-appendix.html diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index 5fe1b205e1..70d17d4f2c 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -60,6 +60,11 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]): """Do we support using `a = ANY(?)` and passing a list""" return False + @property + def supports_returning(self) -> bool: + """Do we support the `RETURNING` clause in insert/update/delete?""" + return self.module.sqlite_version_info >= (3, 35, 0) + def check_database(self, db_conn, allow_outdated_version: bool = False): if not allow_outdated_version: version = self.module.sqlite_version_info diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 051095fea9..a39877f0d5 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -307,7 +307,7 @@ class EventsPersistenceStorage: matched the transcation ID; the existing event is returned in such a case. """ - partitioned = {} # type: Dict[str, List[Tuple[EventBase, EventContext]]] + partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {} for event, ctx in events_and_contexts: partitioned.setdefault(event.room_id, []).append((event, ctx)) @@ -384,7 +384,7 @@ class EventsPersistenceStorage: A dictionary of event ID to event ID we didn't persist as we already had another event persisted with the same TXN ID. """ - replaced_events = {} # type: Dict[str, str] + replaced_events: Dict[str, str] = {} if not events_and_contexts: return replaced_events @@ -440,16 +440,14 @@ class EventsPersistenceStorage: # Set of remote users which were in rooms the server has left. We # should check if we still share any rooms and if not we mark their # device lists as stale. - potentially_left_users = set() # type: Set[str] + potentially_left_users: Set[str] = set() if not backfilled: with Measure(self._clock, "_calculate_state_and_extrem"): # Work out the new "current state" for each room. # We do this by working out what the new extremities are and then # calculating the state from that. - events_by_room = ( - {} - ) # type: Dict[str, List[Tuple[EventBase, EventContext]]] + events_by_room: Dict[str, List[Tuple[EventBase, EventContext]]] = {} for event, context in chunk: events_by_room.setdefault(event.room_id, []).append( (event, context) @@ -622,9 +620,9 @@ class EventsPersistenceStorage: ) # Remove any events which are prev_events of any existing events. - existing_prevs = await self.persist_events_store._get_events_which_are_prevs( - result - ) # type: Collection[str] + existing_prevs: Collection[ + str + ] = await self.persist_events_store._get_events_which_are_prevs(result) result.difference_update(existing_prevs) # Finally handle the case where the new events have soft-failed prev diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 683e5e3b90..61392b9639 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -256,7 +256,7 @@ def _setup_new_database( for database in databases ) - directory_entries = [] # type: List[_DirectoryListing] + directory_entries: List[_DirectoryListing] = [] for directory in directories: directory_entries.extend( _DirectoryListing(file_name, os.path.join(directory, file_name)) @@ -424,10 +424,10 @@ def _upgrade_existing_database( directories.append(os.path.join(schema_path, database, "delta", str(v))) # Used to check if we have any duplicate file names - file_name_counter = Counter() # type: CounterType[str] + file_name_counter: CounterType[str] = Counter() # Now find which directories have anything of interest. - directory_entries = [] # type: List[_DirectoryListing] + directory_entries: List[_DirectoryListing] = [] for directory in directories: logger.debug("Looking for schema deltas in %s", directory) try: @@ -639,7 +639,7 @@ def get_statements(f: Iterable[str]) -> Generator[str, None, None]: def executescript(txn: Cursor, schema_path: str) -> None: - with open(schema_path, "r") as f: + with open(schema_path) as f: execute_statements_from_stream(txn, f) diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 0a53b73ccc..36340a652a 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -SCHEMA_VERSION = 60 +SCHEMA_VERSION = 61 """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -21,6 +21,10 @@ older versions of Synapse). See `README.md <synapse/storage/schema/README.md>`_ for more information on how this works. + +Changes in SCHEMA_VERSION = 61: + - The `user_stats_historical` and `room_stats_historical` tables are not written and + are not read (previously, they were written but not read). """ diff --git a/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres b/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres index 88c9f8bd0d..0edc9fe7a2 100644 --- a/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres +++ b/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres @@ -31,10 +31,15 @@ CREATE OR REPLACE RULE "populate_stream_ordering2" AS INSERT INTO background_updates (ordering, update_name, progress_json) VALUES (6001, 'populate_stream_ordering2', '{}'); --- ... and another to build an index on it +-- ... and some more to build indexes on it. These aren't really interdependent +-- but the backround_updates manager can only handle a single dependency per update. INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES - (6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2'); + (6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2'), + (6001, 'index_stream_ordering2_room_order', '{}', 'index_stream_ordering2'), + (6001, 'index_stream_ordering2_contains_url', '{}', 'index_stream_ordering2_room_order'), + (6001, 'index_stream_ordering2_room_stream', '{}', 'index_stream_ordering2_contains_url'), + (6001, 'index_stream_ordering2_ts', '{}', 'index_stream_ordering2_room_stream'); -- ... and another to do the switcheroo INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES - (6001, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2'); + (6001, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2_ts'); diff --git a/synapse/storage/schema/main/delta/60/02change_stream_ordering_columns.sql.postgres b/synapse/storage/schema/main/delta/60/02change_stream_ordering_columns.sql.postgres new file mode 100644 index 0000000000..630c24fd9e --- /dev/null +++ b/synapse/storage/schema/main/delta/60/02change_stream_ordering_columns.sql.postgres @@ -0,0 +1,30 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- This migration is closely related to '01recreate_stream_ordering.sql.postgres'. +-- +-- It updates the other tables which use an INTEGER to refer to a stream ordering. +-- These tables are all small enough that a re-create is tractable. +ALTER TABLE pushers ALTER COLUMN last_stream_ordering SET DATA TYPE BIGINT; +ALTER TABLE federation_stream_position ALTER COLUMN stream_id SET DATA TYPE BIGINT; + +-- these aren't actually event stream orderings, but they are numbers where 2 billion +-- is a bit limiting, application_services_state is tiny, and I don't want to ever have +-- to do this again. +ALTER TABLE application_services_state ALTER COLUMN last_txn SET DATA TYPE BIGINT; +ALTER TABLE application_services_state ALTER COLUMN read_receipt_stream_id SET DATA TYPE BIGINT; +ALTER TABLE application_services_state ALTER COLUMN presence_stream_id SET DATA TYPE BIGINT; + + diff --git a/synapse/storage/schema/main/delta/61/01change_appservices_txns.sql.postgres b/synapse/storage/schema/main/delta/61/01change_appservices_txns.sql.postgres new file mode 100644 index 0000000000..c8aec78e60 --- /dev/null +++ b/synapse/storage/schema/main/delta/61/01change_appservices_txns.sql.postgres @@ -0,0 +1,23 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- we use bigint elsewhere in the database for appservice txn ids (notably, +-- application_services_state.last_txn), and generally we use bigints everywhere else +-- we have monotonic counters, so let's bring this one in line. +-- +-- assuming there aren't thousands of rows for decommisioned/non-functional ASes, this +-- table should be pretty small, so safe to do a synchronous ALTER TABLE. + +ALTER TABLE application_services_txns ALTER COLUMN txn_id SET DATA TYPE BIGINT; diff --git a/synapse/storage/schema/main/delta/61/02drop_redundant_room_depth_index.sql b/synapse/storage/schema/main/delta/61/02drop_redundant_room_depth_index.sql new file mode 100644 index 0000000000..35ca7a40c0 --- /dev/null +++ b/synapse/storage/schema/main/delta/61/02drop_redundant_room_depth_index.sql @@ -0,0 +1,18 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- this index is redundant; there is another UNIQUE index on this table. +DROP INDEX IF EXISTS room_depth_room; + diff --git a/synapse/storage/schema/main/delta/61/03recreate_min_depth.py b/synapse/storage/schema/main/delta/61/03recreate_min_depth.py new file mode 100644 index 0000000000..f8d7db9f2e --- /dev/null +++ b/synapse/storage/schema/main/delta/61/03recreate_min_depth.py @@ -0,0 +1,70 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This migration handles the process of changing the type of `room_depth.min_depth` to +a BIGINT. +""" +from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine +from synapse.storage.types import Cursor + + +def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs): + if not isinstance(database_engine, PostgresEngine): + # this only applies to postgres - sqlite does not distinguish between big and + # little ints. + return + + # First add a new column to contain the bigger min_depth + cur.execute("ALTER TABLE room_depth ADD COLUMN min_depth2 BIGINT") + + # Create a trigger which will keep it populated. + cur.execute( + """ + CREATE OR REPLACE FUNCTION populate_min_depth2() RETURNS trigger AS $BODY$ + BEGIN + new.min_depth2 := new.min_depth; + RETURN NEW; + END; + $BODY$ LANGUAGE plpgsql + """ + ) + + cur.execute( + """ + CREATE TRIGGER populate_min_depth2_trigger BEFORE INSERT OR UPDATE ON room_depth + FOR EACH ROW + EXECUTE PROCEDURE populate_min_depth2() + """ + ) + + # Start a bg process to populate it for old rooms + cur.execute( + """ + INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (6103, 'populate_room_depth_min_depth2', '{}') + """ + ) + + # and another to switch them over once it completes. + cur.execute( + """ + INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES + (6103, 'replace_room_depth_min_depth', '{}', 'populate_room_depth2') + """ + ) + + +def run_upgrade(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs): + pass diff --git a/synapse/storage/schema/state/delta/61/02state_groups_state_n_distinct.sql.postgres b/synapse/storage/schema/state/delta/61/02state_groups_state_n_distinct.sql.postgres new file mode 100644 index 0000000000..35a153da7b --- /dev/null +++ b/synapse/storage/schema/state/delta/61/02state_groups_state_n_distinct.sql.postgres @@ -0,0 +1,34 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- By default the postgres statistics collector massively underestimates the +-- number of distinct state groups are in the `state_groups_state`, which can +-- cause postgres to use table scans for queries for multiple state groups. +-- +-- To work around this we can manually tell postgres the number of distinct state +-- groups there are by setting `n_distinct` (a negative value here is the number +-- of distinct values divided by the number of rows, so -0.02 means on average +-- there are 50 rows per distinct value). We don't need a particularly +-- accurate number here, as a) we just want it to always use index scans and b) +-- our estimate is going to be better than the one made by the statistics +-- collector. + +ALTER TABLE state_groups_state ALTER COLUMN state_group SET (n_distinct = -0.02); + +-- Ideally we'd do an `ANALYZE state_groups_state (state_group)` here so that +-- the above gets picked up immediately, but that can take a bit of time so we +-- rely on the autovacuum eventually getting run and doing that in the +-- background for us. diff --git a/synapse/storage/state.py b/synapse/storage/state.py index c9dce726cb..f8fbba9d38 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -91,7 +91,7 @@ class StateFilter: Returns: The new state filter. """ - type_dict = {} # type: Dict[str, Optional[Set[str]]] + type_dict: Dict[str, Optional[Set[str]]] = {} for typ, s in types: if typ in type_dict: if type_dict[typ] is None: @@ -194,7 +194,7 @@ class StateFilter: """ where_clause = "" - where_args = [] # type: List[str] + where_args: List[str] = [] if self.is_full(): return where_clause, where_args diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index f1e62f9e85..c768fdea56 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -112,7 +112,7 @@ class StreamIdGenerator: # insertion ordering will ensure its in the correct ordering. # # The key and values are the same, but we never look at the values. - self._unfinished_ids = OrderedDict() # type: OrderedDict[int, int] + self._unfinished_ids: OrderedDict[int, int] = OrderedDict() def get_next(self): """ @@ -236,15 +236,15 @@ class MultiWriterIdGenerator: # Note: If we are a negative stream then we still store all the IDs as # positive to make life easier for us, and simply negate the IDs when we # return them. - self._current_positions = {} # type: Dict[str, int] + self._current_positions: Dict[str, int] = {} # Set of local IDs that we're still processing. The current position # should be less than the minimum of this set (if not empty). - self._unfinished_ids = set() # type: Set[int] + self._unfinished_ids: Set[int] = set() # Set of local IDs that we've processed that are larger than the current # position, due to there being smaller unpersisted IDs. - self._finished_ids = set() # type: Set[int] + self._finished_ids: Set[int] = set() # We track the max position where we know everything before has been # persisted. This is done by a) looking at the min across all instances @@ -265,7 +265,7 @@ class MultiWriterIdGenerator: self._persisted_upto_position = ( min(self._current_positions.values()) if self._current_positions else 1 ) - self._known_persisted_positions = [] # type: List[int] + self._known_persisted_positions: List[int] = [] self._sequence_gen = PostgresSequenceGenerator(sequence_name) @@ -465,7 +465,7 @@ class MultiWriterIdGenerator: self._unfinished_ids.discard(next_id) self._finished_ids.add(next_id) - new_cur = None # type: Optional[int] + new_cur: Optional[int] = None if self._unfinished_ids: # If there are unfinished IDs then the new position will be the diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py index 30b6b8e0ca..bb33e04fb1 100644 --- a/synapse/storage/util/sequence.py +++ b/synapse/storage/util/sequence.py @@ -208,10 +208,10 @@ class LocalSequenceGenerator(SequenceGenerator): get_next_id_txn; should return the curreent maximum id """ # the callback. this is cleared after it is called, so that it can be GCed. - self._callback = get_first_callback # type: Optional[GetFirstCallbackType] + self._callback: Optional[GetFirstCallbackType] = get_first_callback # The current max value, or None if we haven't looked in the DB yet. - self._current_max_id = None # type: Optional[int] + self._current_max_id: Optional[int] = None self._lock = threading.Lock() def get_next_id_txn(self, txn: Cursor) -> int: @@ -274,7 +274,7 @@ def build_sequence_generator( `check_consistency` details. """ if isinstance(database_engine, PostgresEngine): - seq = PostgresSequenceGenerator(sequence_name) # type: SequenceGenerator + seq: SequenceGenerator = PostgresSequenceGenerator(sequence_name) else: seq = LocalSequenceGenerator(get_first_callback) |