diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index e2e6eb479f..5c9db7554e 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1664,9 +1664,6 @@ class PersistEventsStore:
self._handle_event_relations(txn, event)
- self._handle_insertion_event(txn, event)
- self._handle_batch_event(txn, event)
-
# Store the labels for this event.
labels = event.content.get(EventContentFields.LABELS)
if labels:
@@ -1729,13 +1726,22 @@ class PersistEventsStore:
if not row["rejects"] and not row["redacts"]:
to_prefill.append(EventCacheEntry(event=event, redacted_event=None))
- async def prefill() -> None:
+ async def external_prefill() -> None:
+ for cache_entry in to_prefill:
+ await self.store._get_event_cache.set_external(
+ (cache_entry.event.event_id,), cache_entry
+ )
+
+ def local_prefill() -> None:
for cache_entry in to_prefill:
- await self.store._get_event_cache.set(
+ self.store._get_event_cache.set_local(
(cache_entry.event.event_id,), cache_entry
)
- txn.async_call_after(prefill)
+ # The order these are called here is not as important as knowing that after the
+ # transaction is finished, the async_call_after will run before the call_after.
+ txn.async_call_after(external_prefill)
+ txn.call_after(local_prefill)
def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None:
assert event.redacts is not None
@@ -1918,128 +1924,6 @@ class PersistEventsStore:
),
)
- def _handle_insertion_event(
- self, txn: LoggingTransaction, event: EventBase
- ) -> None:
- """Handles keeping track of insertion events and edges/connections.
- Part of MSC2716.
-
- Args:
- txn: The database transaction object
- event: The event to process
- """
-
- if event.type != EventTypes.MSC2716_INSERTION:
- # Not a insertion event
- return
-
- # Skip processing an insertion event if the room version doesn't
- # support it or the event is not from the room creator.
- room_version = self.store.get_room_version_txn(txn, event.room_id)
- room_creator = self.db_pool.simple_select_one_onecol_txn(
- txn,
- table="rooms",
- keyvalues={"room_id": event.room_id},
- retcol="creator",
- allow_none=True,
- )
- if not room_version.msc2716_historical and (
- not self.hs.config.experimental.msc2716_enabled
- or event.sender != room_creator
- ):
- return
-
- next_batch_id = event.content.get(EventContentFields.MSC2716_NEXT_BATCH_ID)
- if next_batch_id is None:
- # Invalid insertion event without next batch ID
- return
-
- logger.debug(
- "_handle_insertion_event (next_batch_id=%s) %s", next_batch_id, event
- )
-
- # Keep track of the insertion event and the batch ID
- self.db_pool.simple_insert_txn(
- txn,
- table="insertion_events",
- values={
- "event_id": event.event_id,
- "room_id": event.room_id,
- "next_batch_id": next_batch_id,
- },
- )
-
- # Insert an edge for every prev_event connection
- for prev_event_id in event.prev_event_ids():
- self.db_pool.simple_insert_txn(
- txn,
- table="insertion_event_edges",
- values={
- "event_id": event.event_id,
- "room_id": event.room_id,
- "insertion_prev_event_id": prev_event_id,
- },
- )
-
- def _handle_batch_event(self, txn: LoggingTransaction, event: EventBase) -> None:
- """Handles inserting the batch edges/connections between the batch event
- and an insertion event. Part of MSC2716.
-
- Args:
- txn: The database transaction object
- event: The event to process
- """
-
- if event.type != EventTypes.MSC2716_BATCH:
- # Not a batch event
- return
-
- # Skip processing a batch event if the room version doesn't
- # support it or the event is not from the room creator.
- room_version = self.store.get_room_version_txn(txn, event.room_id)
- room_creator = self.db_pool.simple_select_one_onecol_txn(
- txn,
- table="rooms",
- keyvalues={"room_id": event.room_id},
- retcol="creator",
- allow_none=True,
- )
- if not room_version.msc2716_historical and (
- not self.hs.config.experimental.msc2716_enabled
- or event.sender != room_creator
- ):
- return
-
- batch_id = event.content.get(EventContentFields.MSC2716_BATCH_ID)
- if batch_id is None:
- # Invalid batch event without a batch ID
- return
-
- logger.debug("_handle_batch_event batch_id=%s %s", batch_id, event)
-
- # Keep track of the insertion event and the batch ID
- self.db_pool.simple_insert_txn(
- txn,
- table="batch_events",
- values={
- "event_id": event.event_id,
- "room_id": event.room_id,
- "batch_id": batch_id,
- },
- )
-
- # When we receive an event with a `batch_id` referencing the
- # `next_batch_id` of the insertion event, we can remove it from the
- # `insertion_event_extremities` table.
- sql = """
- DELETE FROM insertion_event_extremities WHERE event_id IN (
- SELECT event_id FROM insertion_events
- WHERE next_batch_id = ?
- )
- """
-
- txn.execute(sql, (batch_id,))
-
def _handle_redact_relations(
self, txn: LoggingTransaction, room_id: str, redacted_event_id: str
) -> None:
|