diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index d0e015bf19..f5131fe291 100644
--- a/synapse/storage/controllers/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -332,6 +332,7 @@ class EventsPersistenceStorageController:
# store for now.
self.main_store = stores.main
self.state_store = stores.state
+ self._state_deletion_store = stores.state_deletion
assert stores.persist_events
self.persist_events_store = stores.persist_events
@@ -416,7 +417,7 @@ class EventsPersistenceStorageController:
set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
async def enqueue(
- item: Tuple[str, List[Tuple[EventBase, EventContext]]]
+ item: Tuple[str, List[Tuple[EventBase, EventContext]]],
) -> Dict[str, str]:
room_id, evs_ctxs = item
return await self._event_persist_queue.add_to_queue(
@@ -502,8 +503,15 @@ class EventsPersistenceStorageController:
"""
state = await self._calculate_current_state(room_id)
delta = await self._calculate_state_delta(room_id, state)
+ sliding_sync_table_changes = (
+ await self.persist_events_store._calculate_sliding_sync_table_changes(
+ room_id, [], delta
+ )
+ )
- await self.persist_events_store.update_current_state(room_id, delta)
+ await self.persist_events_store.update_current_state(
+ room_id, delta, sliding_sync_table_changes
+ )
async def _calculate_current_state(self, room_id: str) -> StateMap[str]:
"""Calculate the current state of a room, based on the forward extremities
@@ -542,7 +550,9 @@ class EventsPersistenceStorageController:
room_version,
state_maps_by_state_group,
event_map=None,
- state_res_store=StateResolutionStore(self.main_store),
+ state_res_store=StateResolutionStore(
+ self.main_store, self._state_deletion_store
+ ),
)
return await res.get_state(self._state_controller, StateFilter.all())
@@ -628,15 +638,20 @@ class EventsPersistenceStorageController:
room_id, [e for e, _ in chunk]
)
- await self.persist_events_store._persist_events_and_state_updates(
- room_id,
- chunk,
- state_delta_for_room=state_delta_for_room,
- new_forward_extremities=new_forward_extremities,
- use_negative_stream_ordering=backfilled,
- inhibit_local_membership_updates=backfilled,
- new_event_links=new_event_links,
- )
+ # Stop the state groups from being deleted while we're persisting
+ # them.
+ async with self._state_deletion_store.persisting_state_group_references(
+ events_and_contexts
+ ):
+ await self.persist_events_store._persist_events_and_state_updates(
+ room_id,
+ chunk,
+ state_delta_for_room=state_delta_for_room,
+ new_forward_extremities=new_forward_extremities,
+ use_negative_stream_ordering=backfilled,
+ inhibit_local_membership_updates=backfilled,
+ new_event_links=new_event_links,
+ )
return replaced_events
@@ -785,9 +800,9 @@ class EventsPersistenceStorageController:
)
# Remove any events which are prev_events of any existing events.
- existing_prevs: Collection[str] = (
- await self.persist_events_store._get_events_which_are_prevs(result)
- )
+ existing_prevs: Collection[
+ str
+ ] = await self.persist_events_store._get_events_which_are_prevs(result)
result.difference_update(existing_prevs)
# Finally handle the case where the new events have soft-failed prev
@@ -855,8 +870,7 @@ class EventsPersistenceStorageController:
# This should only happen for outlier events.
if not ev.internal_metadata.is_outlier():
raise Exception(
- "Context for new event %s has no state "
- "group" % (ev.event_id,)
+ "Context for new event %s has no state group" % (ev.event_id,)
)
continue
if ctx.state_group_deltas:
@@ -958,7 +972,9 @@ class EventsPersistenceStorageController:
room_version,
state_groups,
events_map,
- state_res_store=StateResolutionStore(self.main_store),
+ state_res_store=StateResolutionStore(
+ self.main_store, self._state_deletion_store
+ ),
)
state_resolutions_during_persistence.inc()
|