diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index c801a93b5b..f855903c39 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -80,6 +80,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
+ self._instance_name: str = hs.get_instance_name()
async def get_room_version(self, room_id: str) -> RoomVersion:
"""Get the room_version of a given room
@@ -404,18 +405,21 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
context: EventContext,
) -> None:
"""Update the state group for a partial state event"""
- await self.db_pool.runInteraction(
- "update_state_for_partial_state_event",
- self._update_state_for_partial_state_event_txn,
- event,
- context,
- )
+ async with self._un_partial_stated_events_stream_id_gen.get_next() as un_partial_state_event_stream_id:
+ await self.db_pool.runInteraction(
+ "update_state_for_partial_state_event",
+ self._update_state_for_partial_state_event_txn,
+ event,
+ context,
+ un_partial_state_event_stream_id,
+ )
def _update_state_for_partial_state_event_txn(
self,
txn: LoggingTransaction,
event: EventBase,
context: EventContext,
+ un_partial_state_event_stream_id: int,
) -> None:
# we shouldn't have any outliers here
assert not event.internal_metadata.is_outlier()
@@ -436,7 +440,10 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
# the event may now be rejected where it was not before, or vice versa,
# in which case we need to update the rejected flags.
- if bool(context.rejected) != (event.rejected_reason is not None):
+ rejection_status_changed = bool(context.rejected) != (
+ event.rejected_reason is not None
+ )
+ if rejection_status_changed:
self.mark_event_rejected_txn(txn, event.event_id, context.rejected)
self.db_pool.simple_delete_one_txn(
@@ -445,8 +452,6 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
keyvalues={"event_id": event.event_id},
)
- # TODO(faster_joins): need to do something about workers here
- # https://github.com/matrix-org/synapse/issues/12994
txn.call_after(self.is_partial_state_event.invalidate, (event.event_id,))
txn.call_after(
self._get_state_group_for_event.prefill,
@@ -454,6 +459,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
state_group,
)
+ self.db_pool.simple_insert_txn(
+ txn,
+ "un_partial_stated_event_stream",
+ {
+ "stream_id": un_partial_state_event_stream_id,
+ "instance_name": self._instance_name,
+ "event_id": event.event_id,
+ "rejection_status_changed": rejection_status_changed,
+ },
+ )
+
class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
|