summary refs log tree commit diff
path: root/synapse/storage/databases/main/events.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2021-09-10 10:43:42 +0100
committerRichard van der Hoff <richard@matrix.org>2021-09-10 10:43:42 +0100
commit97ef48b07e79c9d0a61de5a41ef216a1dc495976 (patch)
treee8c5d0caa4655564a0406632ae4c6f778492d845 /synapse/storage/databases/main/events.py
parentRevert "Expand on why users should read upgrade notes" (diff)
parentAsk consent on SSO registration with default mxid (#10733) (diff)
downloadsynapse-97ef48b07e79c9d0a61de5a41ef216a1dc495976.tar.xz
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
Diffstat (limited to 'synapse/storage/databases/main/events.py')
-rw-r--r--synapse/storage/databases/main/events.py134
1 files changed, 86 insertions, 48 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py

index 40b53274fb..8e691678e5 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py
@@ -575,7 +575,13 @@ class PersistEventsStore: missing_auth_chains.clear() - for auth_id, event_type, state_key, chain_id, sequence_number in txn: + for ( + auth_id, + event_type, + state_key, + chain_id, + sequence_number, + ) in txn.fetchall(): event_to_types[auth_id] = (event_type, state_key) if chain_id is None: @@ -1379,18 +1385,18 @@ class PersistEventsStore: # If we're persisting an unredacted event we go and ensure # that we mark any redactions that reference this event as # requiring censoring. - sql = "UPDATE redactions SET have_censored = ? WHERE redacts = ?" - txn.execute_batch( - sql, - ( - ( - False, - event.event_id, - ) - for event, _ in events_and_contexts - if not event.internal_metadata.is_redacted() - ), + unredacted_events = [ + event.event_id + for event, _ in events_and_contexts + if not event.internal_metadata.is_redacted() + ] + sql = "UPDATE redactions SET have_censored = ? WHERE " + clause, args = make_in_list_sql_clause( + self.database_engine, + "redacts", + unredacted_events, ) + txn.execute(sql + clause, [False] + args) state_events_and_contexts = [ ec for ec in events_and_contexts if ec[0].is_state() @@ -1541,35 +1547,32 @@ class PersistEventsStore: to_prefill = [] rows = [] - N = 200 - for i in range(0, len(events_and_contexts), N): - ev_map = {e[0].event_id: e[0] for e in events_and_contexts[i : i + N]} - if not ev_map: - break - - sql = ( - "SELECT " - " e.event_id as event_id, " - " r.redacts as redacts," - " rej.event_id as rejects " - " FROM events as e" - " LEFT JOIN rejections as rej USING (event_id)" - " LEFT JOIN redactions as r ON e.event_id = r.redacts" - " WHERE " - ) - clause, args = make_in_list_sql_clause( - self.database_engine, "e.event_id", list(ev_map) - ) + ev_map = {e.event_id: e for e, _ in events_and_contexts} + if not ev_map: + return - txn.execute(sql + clause, args) - rows = self.db_pool.cursor_to_dict(txn) - for row in rows: - event = ev_map[row["event_id"]] - if not row["rejects"] and not row["redacts"]: - to_prefill.append( - _EventCacheEntry(event=event, redacted_event=None) - ) + sql = ( + "SELECT " + " e.event_id as event_id, " + " r.redacts as redacts," + " rej.event_id as rejects " + " FROM events as e" + " LEFT JOIN rejections as rej USING (event_id)" + " LEFT JOIN redactions as r ON e.event_id = r.redacts" + " WHERE " + ) + + clause, args = make_in_list_sql_clause( + self.database_engine, "e.event_id", list(ev_map) + ) + + txn.execute(sql + clause, args) + rows = self.db_pool.cursor_to_dict(txn) + for row in rows: + event = ev_map[row["event_id"]] + if not row["rejects"] and not row["redacts"]: + to_prefill.append(_EventCacheEntry(event=event, redacted_event=None)) def prefill(): for cache_entry in to_prefill: @@ -1770,10 +1773,21 @@ class PersistEventsStore: # Not a insertion event return - # Skip processing a insertion event if the room version doesn't - # support it. + # Skip processing an insertion event if the room version doesn't + # support it or the event is not from the room creator. room_version = self.store.get_room_version_txn(txn, event.room_id) - if not room_version.msc2716_historical: + room_creator = self.db_pool.simple_select_one_onecol_txn( + txn, + table="rooms", + keyvalues={"room_id": event.room_id}, + retcol="creator", + allow_none=True, + ) + if ( + not room_version.msc2716_historical + or not self.hs.config.experimental.msc2716_enabled + or event.sender != room_creator + ): return next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID) @@ -1822,9 +1836,20 @@ class PersistEventsStore: return # Skip processing a chunk event if the room version doesn't - # support it. + # support it or the event is not from the room creator. room_version = self.store.get_room_version_txn(txn, event.room_id) - if not room_version.msc2716_historical: + room_creator = self.db_pool.simple_select_one_onecol_txn( + txn, + table="rooms", + keyvalues={"room_id": event.room_id}, + retcol="creator", + allow_none=True, + ) + if ( + not room_version.msc2716_historical + or not self.hs.config.experimental.msc2716_enabled + or event.sender != room_creator + ): return chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID) @@ -1962,6 +1987,15 @@ class PersistEventsStore: events_and_context. """ + # Only non outlier events will have push actions associated with them, + # so let's filter them out. (This makes joining large rooms faster, as + # these queries took seconds to process all the state events). + non_outlier_events = [ + event + for event, _ in events_and_contexts + if not event.internal_metadata.is_outlier() + ] + sql = """ INSERT INTO event_push_actions ( room_id, event_id, user_id, actions, stream_ordering, @@ -1972,7 +2006,7 @@ class PersistEventsStore: WHERE event_id = ? """ - if events_and_contexts: + if non_outlier_events: txn.execute_batch( sql, ( @@ -1982,12 +2016,12 @@ class PersistEventsStore: event.depth, event.event_id, ) - for event, _ in events_and_contexts + for event in non_outlier_events ), ) room_to_event_ids: Dict[str, List[str]] = {} - for e, _ in events_and_contexts: + for e in non_outlier_events: room_to_event_ids.setdefault(e.room_id, []).append(e.event_id) for room_id, event_ids in room_to_event_ids.items(): @@ -2012,7 +2046,11 @@ class PersistEventsStore: # persisted. txn.execute_batch( "DELETE FROM event_push_actions_staging WHERE event_id = ?", - ((event.event_id,) for event, _ in all_events_and_contexts), + ( + (event.event_id,) + for event, _ in all_events_and_contexts + if not event.internal_metadata.is_outlier() + ), ) def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):