summary refs log tree commit diff
path: root/synapse/storage/controllers/persist_events.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/storage/controllers/persist_events.py52
1 files changed, 34 insertions, 18 deletions
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py

index d0e015bf19..f5131fe291 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py
@@ -332,6 +332,7 @@ class EventsPersistenceStorageController: # store for now. self.main_store = stores.main self.state_store = stores.state + self._state_deletion_store = stores.state_deletion assert stores.persist_events self.persist_events_store = stores.persist_events @@ -416,7 +417,7 @@ class EventsPersistenceStorageController: set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled)) async def enqueue( - item: Tuple[str, List[Tuple[EventBase, EventContext]]] + item: Tuple[str, List[Tuple[EventBase, EventContext]]], ) -> Dict[str, str]: room_id, evs_ctxs = item return await self._event_persist_queue.add_to_queue( @@ -502,8 +503,15 @@ class EventsPersistenceStorageController: """ state = await self._calculate_current_state(room_id) delta = await self._calculate_state_delta(room_id, state) + sliding_sync_table_changes = ( + await self.persist_events_store._calculate_sliding_sync_table_changes( + room_id, [], delta + ) + ) - await self.persist_events_store.update_current_state(room_id, delta) + await self.persist_events_store.update_current_state( + room_id, delta, sliding_sync_table_changes + ) async def _calculate_current_state(self, room_id: str) -> StateMap[str]: """Calculate the current state of a room, based on the forward extremities @@ -542,7 +550,9 @@ class EventsPersistenceStorageController: room_version, state_maps_by_state_group, event_map=None, - state_res_store=StateResolutionStore(self.main_store), + state_res_store=StateResolutionStore( + self.main_store, self._state_deletion_store + ), ) return await res.get_state(self._state_controller, StateFilter.all()) @@ -628,15 +638,20 @@ class EventsPersistenceStorageController: room_id, [e for e, _ in chunk] ) - await self.persist_events_store._persist_events_and_state_updates( - room_id, - chunk, - state_delta_for_room=state_delta_for_room, - new_forward_extremities=new_forward_extremities, - use_negative_stream_ordering=backfilled, - inhibit_local_membership_updates=backfilled, - new_event_links=new_event_links, - ) + # Stop the state groups from being deleted while we're persisting + # them. + async with self._state_deletion_store.persisting_state_group_references( + events_and_contexts + ): + await self.persist_events_store._persist_events_and_state_updates( + room_id, + chunk, + state_delta_for_room=state_delta_for_room, + new_forward_extremities=new_forward_extremities, + use_negative_stream_ordering=backfilled, + inhibit_local_membership_updates=backfilled, + new_event_links=new_event_links, + ) return replaced_events @@ -785,9 +800,9 @@ class EventsPersistenceStorageController: ) # Remove any events which are prev_events of any existing events. - existing_prevs: Collection[str] = ( - await self.persist_events_store._get_events_which_are_prevs(result) - ) + existing_prevs: Collection[ + str + ] = await self.persist_events_store._get_events_which_are_prevs(result) result.difference_update(existing_prevs) # Finally handle the case where the new events have soft-failed prev @@ -855,8 +870,7 @@ class EventsPersistenceStorageController: # This should only happen for outlier events. if not ev.internal_metadata.is_outlier(): raise Exception( - "Context for new event %s has no state " - "group" % (ev.event_id,) + "Context for new event %s has no state group" % (ev.event_id,) ) continue if ctx.state_group_deltas: @@ -958,7 +972,9 @@ class EventsPersistenceStorageController: room_version, state_groups, events_map, - state_res_store=StateResolutionStore(self.main_store), + state_res_store=StateResolutionStore( + self.main_store, self._state_deletion_store + ), ) state_resolutions_during_persistence.inc()