diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 2046b8e276..6a30aa6f81 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -578,7 +578,13 @@ class PersistEventsStore:
missing_auth_chains.clear()
- for auth_id, event_type, state_key, chain_id, sequence_number in txn:
+ for (
+ auth_id,
+ event_type,
+ state_key,
+ chain_id,
+ sequence_number,
+ ) in txn.fetchall():
event_to_types[auth_id] = (event_type, state_key)
if chain_id is None:
@@ -1382,18 +1388,18 @@ class PersistEventsStore:
# If we're persisting an unredacted event we go and ensure
# that we mark any redactions that reference this event as
# requiring censoring.
- sql = "UPDATE redactions SET have_censored = ? WHERE redacts = ?"
- txn.execute_batch(
- sql,
- (
- (
- False,
- event.event_id,
- )
- for event, _ in events_and_contexts
- if not event.internal_metadata.is_redacted()
- ),
+ unredacted_events = [
+ event.event_id
+ for event, _ in events_and_contexts
+ if not event.internal_metadata.is_redacted()
+ ]
+ sql = "UPDATE redactions SET have_censored = ? WHERE "
+ clause, args = make_in_list_sql_clause(
+ self.database_engine,
+ "redacts",
+ unredacted_events,
)
+ txn.execute(sql + clause, [False] + args)
state_events_and_contexts = [
ec for ec in events_and_contexts if ec[0].is_state()
@@ -1773,10 +1779,21 @@ class PersistEventsStore:
# Not a insertion event
return
- # Skip processing a insertion event if the room version doesn't
- # support it.
+ # Skip processing an insertion event if the room version doesn't
+ # support it or the event is not from the room creator.
room_version = self.store.get_room_version_txn(txn, event.room_id)
- if not room_version.msc2716_historical:
+ room_creator = self.db_pool.simple_select_one_onecol_txn(
+ txn,
+ table="rooms",
+ keyvalues={"room_id": event.room_id},
+ retcol="creator",
+ allow_none=True,
+ )
+ if (
+ not room_version.msc2716_historical
+ or not self.hs.config.experimental.msc2716_enabled
+ or event.sender != room_creator
+ ):
return
next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID)
@@ -1825,9 +1842,20 @@ class PersistEventsStore:
return
# Skip processing a chunk event if the room version doesn't
- # support it.
+ # support it or the event is not from the room creator.
room_version = self.store.get_room_version_txn(txn, event.room_id)
- if not room_version.msc2716_historical:
+ room_creator = self.db_pool.simple_select_one_onecol_txn(
+ txn,
+ table="rooms",
+ keyvalues={"room_id": event.room_id},
+ retcol="creator",
+ allow_none=True,
+ )
+ if (
+ not room_version.msc2716_historical
+ or not self.hs.config.experimental.msc2716_enabled
+ or event.sender != room_creator
+ ):
return
chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID)
@@ -1848,6 +1876,18 @@ class PersistEventsStore:
},
)
+ # When we receive an event with a `chunk_id` referencing the
+ # `next_chunk_id` of the insertion event, we can remove it from the
+ # `insertion_event_extremities` table.
+ sql = """
+ DELETE FROM insertion_event_extremities WHERE event_id IN (
+ SELECT event_id FROM insertion_events
+ WHERE next_chunk_id = ?
+ )
+ """
+
+ txn.execute(sql, (chunk_id,))
+
def _handle_redaction(self, txn, redacted_event_id):
"""Handles receiving a redaction and checking whether we need to remove
any redacted relations from the database.
@@ -2104,15 +2144,17 @@ class PersistEventsStore:
Forward extremities are handled when we first start persisting the events.
"""
+ # From the events passed in, add all of the prev events as backwards extremities.
+ # Ignore any events that are already backwards extrems or outliers.
query = (
"INSERT INTO event_backward_extremities (event_id, room_id)"
" SELECT ?, ? WHERE NOT EXISTS ("
- " SELECT 1 FROM event_backward_extremities"
- " WHERE event_id = ? AND room_id = ?"
+ " SELECT 1 FROM event_backward_extremities"
+ " WHERE event_id = ? AND room_id = ?"
" )"
" AND NOT EXISTS ("
- " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
- " AND outlier = ?"
+ " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+ " AND outlier = ?"
" )"
)
@@ -2126,6 +2168,8 @@ class PersistEventsStore:
],
)
+ # Delete all these events that we've already fetched and now know that their
+ # prev events are the new backwards extremeties.
query = (
"DELETE FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
|