diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/storage/_base.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/events.py | 99 | ||||
-rw-r--r-- | synapse/storage/databases/main/events_bg_updates.py | 10 | ||||
-rw-r--r-- | synapse/storage/databases/main/events_worker.py | 167 | ||||
-rw-r--r-- | synapse/storage/databases/main/purge_events.py | 27 | ||||
-rw-r--r-- | synapse/storage/databases/main/rejections.py | 23 |
6 files changed, 237 insertions, 93 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index abfc56b061..b15c37679b 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -36,6 +36,10 @@ class SQLBaseStore(metaclass=ABCMeta): per data store (and not one per physical database). """ + # if set to False, we will query the `state_events` and `rejections` tables when + # fetching event data. When True, we rely on it all being in the `events` table. + STATE_KEY_IN_EVENTS = False + def __init__( self, database: DatabasePool, diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 2ff3d21305..6456a951bf 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -257,17 +257,30 @@ class PersistEventsStore: def _get_events_which_are_prevs_txn( txn: LoggingTransaction, batch: Collection[str] ) -> None: - sql = """ - SELECT prev_event_id, internal_metadata - FROM event_edges - INNER JOIN events USING (event_id) - LEFT JOIN rejections USING (event_id) - LEFT JOIN event_json USING (event_id) - WHERE - NOT events.outlier - AND rejections.event_id IS NULL - AND - """ + if self.store.STATE_KEY_IN_EVENTS: + sql = """ + SELECT prev_event_id, internal_metadata + FROM event_edges + INNER JOIN events USING (event_id) + LEFT JOIN event_json USING (event_id) + WHERE + NOT events.outlier + AND events.rejection_reason IS NULL + AND + """ + + else: + sql = """ + SELECT prev_event_id, internal_metadata + FROM event_edges + INNER JOIN events USING (event_id) + LEFT JOIN rejections USING (event_id) + LEFT JOIN event_json USING (event_id) + WHERE + NOT events.outlier + AND rejections.event_id IS NULL + AND + """ clause, args = make_in_list_sql_clause( self.database_engine, "prev_event_id", batch @@ -311,7 +324,19 @@ class PersistEventsStore: ) -> None: to_recursively_check = batch - while to_recursively_check: + if self.store.STATE_KEY_IN_EVENTS: + sql = """ + SELECT + event_id, prev_event_id, internal_metadata, + events.rejection_reason IS NOT NULL + FROM event_edges + INNER JOIN events USING (event_id) + LEFT JOIN event_json USING (event_id) + WHERE + NOT events.outlier + AND + """ + else: sql = """ SELECT event_id, prev_event_id, internal_metadata, @@ -325,6 +350,7 @@ class PersistEventsStore: AND """ + while to_recursively_check: clause, args = make_in_list_sql_clause( self.database_engine, "event_id", to_recursively_check ) @@ -530,6 +556,7 @@ class PersistEventsStore: event_to_room_id = {e.event_id: e.room_id for e in state_events.values()} self._add_chain_cover_index( + self.store.STATE_KEY_IN_EVENTS, txn, self.db_pool, self.store.event_chain_id_gen, @@ -541,6 +568,7 @@ class PersistEventsStore: @classmethod def _add_chain_cover_index( cls, + state_key_in_events: bool, txn: LoggingTransaction, db_pool: DatabasePool, event_chain_id_gen: SequenceGenerator, @@ -551,6 +579,8 @@ class PersistEventsStore: """Calculate the chain cover index for the given events. Args: + state_key_in_events: whether to use the `state_key` column in the `events` + table in preference to the `state_events` table event_to_room_id: Event ID to the room ID of the event event_to_types: Event ID to type and state_key of the event event_to_auth_chain: Event ID to list of auth event IDs of the @@ -610,7 +640,15 @@ class PersistEventsStore: # We loop here in case we find an out of band membership and need to # fetch their auth event info. - while missing_auth_chains: + if state_key_in_events: + sql = """ + SELECT event_id, events.type, events.state_key, chain_id, sequence_number + FROM events + LEFT JOIN event_auth_chains USING (event_id) + WHERE + events.state_key IS NOT NULL AND + """ + else: sql = """ SELECT event_id, events.type, se.state_key, chain_id, sequence_number FROM events @@ -618,6 +656,8 @@ class PersistEventsStore: LEFT JOIN event_auth_chains USING (event_id) WHERE """ + + while missing_auth_chains: clause, args = make_in_list_sql_clause( txn.database_engine, "event_id", @@ -1641,22 +1681,31 @@ class PersistEventsStore: ) -> None: to_prefill = [] - rows = [] - ev_map = {e.event_id: e for e, _ in events_and_contexts} if not ev_map: return - sql = ( - "SELECT " - " e.event_id as event_id, " - " r.redacts as redacts," - " rej.event_id as rejects " - " FROM events as e" - " LEFT JOIN rejections as rej USING (event_id)" - " LEFT JOIN redactions as r ON e.event_id = r.redacts" - " WHERE " - ) + if self.store.STATE_KEY_IN_EVENTS: + sql = ( + "SELECT " + " e.event_id as event_id, " + " r.redacts as redacts," + " e.rejection_reason as rejects " + " FROM events as e" + " LEFT JOIN redactions as r ON e.event_id = r.redacts" + " WHERE " + ) + else: + sql = ( + "SELECT " + " e.event_id as event_id, " + " r.redacts as redacts," + " rej.event_id as rejects " + " FROM events as e" + " LEFT JOIN rejections as rej USING (event_id)" + " LEFT JOIN redactions as r ON e.event_id = r.redacts" + " WHERE " + ) clause, args = make_in_list_sql_clause( self.database_engine, "e.event_id", list(ev_map) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 7d951d85d6..8746bc493e 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -448,6 +448,10 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): # First, we get `batch_size` events from the table, pulling out # their successor events, if any, and the successor events' # rejection status. + + # this should happen before the bg update which drops 'rejections' + assert not self.STATE_KEY_IN_EVENTS + txn.execute( """SELECT prev_event_id, event_id, internal_metadata, rejections.event_id IS NOT NULL, events.outlier @@ -973,6 +977,9 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): extra_clause = "AND events.room_id = ?" tuple_args.append(last_room_id) + # this should happen before the bg update which drops 'state_events' + assert not self.STATE_KEY_IN_EVENTS + sql = """ SELECT event_id, state_events.type, state_events.state_key, @@ -1041,9 +1048,10 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): # Calculate and persist the chain cover index for this set of events. # - # Annoyingly we need to gut wrench into the persit event store so that + # Annoyingly we need to gut wrench into the persist event store so that # we can reuse the function to calculate the chain cover for rooms. PersistEventsStore._add_chain_cover_index( + False, txn, self.db_pool, self.event_chain_id_gen, # type: ignore[attr-defined] diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index b99b107784..7cd94f3964 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1482,20 +1482,35 @@ class EventsWorkerStore(SQLBaseStore): def get_all_new_forward_event_rows( txn: LoggingTransaction, ) -> List[Tuple[int, str, str, str, str, str, str, str, str]]: - sql = ( - "SELECT e.stream_ordering, e.event_id, e.room_id, e.type," - " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL" - " FROM events AS e" - " LEFT JOIN redactions USING (event_id)" - " LEFT JOIN state_events AS se USING (event_id)" - " LEFT JOIN event_relations USING (event_id)" - " LEFT JOIN room_memberships USING (event_id)" - " LEFT JOIN rejections USING (event_id)" - " WHERE ? < stream_ordering AND stream_ordering <= ?" - " AND instance_name = ?" - " ORDER BY stream_ordering ASC" - " LIMIT ?" - ) + if self.STATE_KEY_IN_EVENTS: + sql = ( + "SELECT e.stream_ordering, e.event_id, e.room_id, e.type," + " e.state_key, redacts, relates_to_id, membership, e.rejection_reason IS NOT NULL" + " FROM events AS e" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN event_relations USING (event_id)" + " LEFT JOIN room_memberships USING (event_id)" + " WHERE ? < stream_ordering AND stream_ordering <= ?" + " AND instance_name = ?" + " ORDER BY stream_ordering ASC" + " LIMIT ?" + ) + else: + sql = ( + "SELECT e.stream_ordering, e.event_id, e.room_id, e.type," + " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL" + " FROM events AS e" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events AS se USING (event_id)" + " LEFT JOIN event_relations USING (event_id)" + " LEFT JOIN room_memberships USING (event_id)" + " LEFT JOIN rejections USING (event_id)" + " WHERE ? < stream_ordering AND stream_ordering <= ?" + " AND instance_name = ?" + " ORDER BY stream_ordering ASC" + " LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, instance_name, limit)) return cast( List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall() @@ -1523,21 +1538,36 @@ class EventsWorkerStore(SQLBaseStore): def get_ex_outlier_stream_rows_txn( txn: LoggingTransaction, ) -> List[Tuple[int, str, str, str, str, str, str, str, str]]: - sql = ( - "SELECT event_stream_ordering, e.event_id, e.room_id, e.type," - " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL" - " FROM events AS e" - " INNER JOIN ex_outlier_stream AS out USING (event_id)" - " LEFT JOIN redactions USING (event_id)" - " LEFT JOIN state_events AS se USING (event_id)" - " LEFT JOIN event_relations USING (event_id)" - " LEFT JOIN room_memberships USING (event_id)" - " LEFT JOIN rejections USING (event_id)" - " WHERE ? < event_stream_ordering" - " AND event_stream_ordering <= ?" - " AND out.instance_name = ?" - " ORDER BY event_stream_ordering ASC" - ) + if self.STATE_KEY_IN_EVENTS: + sql = ( + "SELECT event_stream_ordering, e.event_id, e.room_id, e.type," + " e.state_key, redacts, relates_to_id, membership, e.rejection_reason IS NOT NULL" + " FROM events AS e" + " INNER JOIN ex_outlier_stream AS out USING (event_id)" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN event_relations USING (event_id)" + " LEFT JOIN room_memberships USING (event_id)" + " WHERE ? < event_stream_ordering" + " AND event_stream_ordering <= ?" + " AND out.instance_name = ?" + " ORDER BY event_stream_ordering ASC" + ) + else: + sql = ( + "SELECT event_stream_ordering, e.event_id, e.room_id, e.type," + " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL" + " FROM events AS e" + " INNER JOIN ex_outlier_stream AS out USING (event_id)" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events AS se USING (event_id)" + " LEFT JOIN event_relations USING (event_id)" + " LEFT JOIN room_memberships USING (event_id)" + " LEFT JOIN rejections USING (event_id)" + " WHERE ? < event_stream_ordering" + " AND event_stream_ordering <= ?" + " AND out.instance_name = ?" + " ORDER BY event_stream_ordering ASC" + ) txn.execute(sql, (last_id, current_id, instance_name)) return cast( @@ -1581,18 +1611,32 @@ class EventsWorkerStore(SQLBaseStore): def get_all_new_backfill_event_rows( txn: LoggingTransaction, ) -> Tuple[List[Tuple[int, Tuple[str, str, str, str, str, str]]], int, bool]: - sql = ( - "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type," - " se.state_key, redacts, relates_to_id" - " FROM events AS e" - " LEFT JOIN redactions USING (event_id)" - " LEFT JOIN state_events AS se USING (event_id)" - " LEFT JOIN event_relations USING (event_id)" - " WHERE ? > stream_ordering AND stream_ordering >= ?" - " AND instance_name = ?" - " ORDER BY stream_ordering ASC" - " LIMIT ?" - ) + if self.STATE_KEY_IN_EVENTS: + sql = ( + "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type," + " e.state_key, redacts, relates_to_id" + " FROM events AS e" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN event_relations USING (event_id)" + " WHERE ? > stream_ordering AND stream_ordering >= ?" + " AND instance_name = ?" + " ORDER BY stream_ordering ASC" + " LIMIT ?" + ) + else: + sql = ( + "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type," + " se.state_key, redacts, relates_to_id" + " FROM events AS e" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events AS se USING (event_id)" + " LEFT JOIN event_relations USING (event_id)" + " WHERE ? > stream_ordering AND stream_ordering >= ?" + " AND instance_name = ?" + " ORDER BY stream_ordering ASC" + " LIMIT ?" + ) + txn.execute(sql, (-last_id, -current_id, instance_name, limit)) new_event_updates: List[ Tuple[int, Tuple[str, str, str, str, str, str]] @@ -1611,19 +1655,34 @@ class EventsWorkerStore(SQLBaseStore): else: upper_bound = current_id - sql = ( - "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type," - " se.state_key, redacts, relates_to_id" - " FROM events AS e" - " INNER JOIN ex_outlier_stream AS out USING (event_id)" - " LEFT JOIN redactions USING (event_id)" - " LEFT JOIN state_events AS se USING (event_id)" - " LEFT JOIN event_relations USING (event_id)" - " WHERE ? > event_stream_ordering" - " AND event_stream_ordering >= ?" - " AND out.instance_name = ?" - " ORDER BY event_stream_ordering DESC" - ) + if self.STATE_KEY_IN_EVENTS: + sql = ( + "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type," + " e.state_key, redacts, relates_to_id" + " FROM events AS e" + " INNER JOIN ex_outlier_stream AS out USING (event_id)" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN event_relations USING (event_id)" + " WHERE ? > event_stream_ordering" + " AND event_stream_ordering >= ?" + " AND out.instance_name = ?" + " ORDER BY event_stream_ordering DESC" + ) + else: + sql = ( + "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type," + " se.state_key, redacts, relates_to_id" + " FROM events AS e" + " INNER JOIN ex_outlier_stream AS out USING (event_id)" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events AS se USING (event_id)" + " LEFT JOIN event_relations USING (event_id)" + " WHERE ? > event_stream_ordering" + " AND event_stream_ordering >= ?" + " AND out.instance_name = ?" + " ORDER BY event_stream_ordering DESC" + ) + txn.execute(sql, (-last_id, -upper_bound, instance_name)) # Type safety: iterating over `txn` yields `Tuple`, i.e. # `Tuple[Any, ...]` of arbitrary length. Mypy detects assigning a diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 87b0d09039..37b53de971 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -122,7 +122,11 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): logger.info("[purge] looking for events to delete") - should_delete_expr = "state_events.state_key IS NULL" + should_delete_expr = ( + "e.state_key IS NULL" + if self.STATE_KEY_IN_EVENTS + else "state_events.state_key IS NULL" + ) should_delete_params: Tuple[Any, ...] = () if not delete_local_events: should_delete_expr += " AND event_id NOT LIKE ?" @@ -134,12 +138,23 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): # Note that we insert events that are outliers and aren't going to be # deleted, as nothing will happen to them. + if self.STATE_KEY_IN_EVENTS: + sqlf = """ + INSERT INTO events_to_purge + SELECT event_id, %s + FROM events AS e + WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ? + """ + else: + sqlf = """ + INSERT INTO events_to_purge + SELECT event_id, %s + FROM events AS e LEFT JOIN state_events USING (event_id) + WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ? + """ + txn.execute( - "INSERT INTO events_to_purge" - " SELECT event_id, %s" - " FROM events AS e LEFT JOIN state_events USING (event_id)" - " WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?" - % (should_delete_expr, should_delete_expr), + sqlf % (should_delete_expr, should_delete_expr), should_delete_params, ) diff --git a/synapse/storage/databases/main/rejections.py b/synapse/storage/databases/main/rejections.py index 167318b314..f57262c51a 100644 --- a/synapse/storage/databases/main/rejections.py +++ b/synapse/storage/databases/main/rejections.py @@ -22,10 +22,19 @@ logger = logging.getLogger(__name__) class RejectionsStore(SQLBaseStore): async def get_rejection_reason(self, event_id: str) -> Optional[str]: - return await self.db_pool.simple_select_one_onecol( - table="rejections", - retcol="reason", - keyvalues={"event_id": event_id}, - allow_none=True, - desc="get_rejection_reason", - ) + if self.STATE_KEY_IN_EVENTS: + return await self.db_pool.simple_select_one_onecol( + table="events", + retcol="rejection_reason", + keyvalues={"event_id": event_id}, + allow_none=True, + desc="get_rejection_reason", + ) + else: + return await self.db_pool.simple_select_one_onecol( + table="rejections", + retcol="reason", + keyvalues={"event_id": event_id}, + allow_none=True, + desc="get_rejection_reason", + ) |